Merge pull request #21741 from resouer/oomscore

Automatic merge from submit-queue

Use OomScoreAdj in kubelet for newer docker api

fixes: #20121

Related: client side PR [pull 454](https://github.com/fsouza/go-dockerclient/pull/454)

Godeps has already been updated to `0099401a7342ad77e71ca9f9a57c5e72fb80f6b2`, which included client side's modification. But it seems too aggressive to upgrade the docker api version of kubelet.
This commit is contained in:
k8s-merge-robot 2016-04-14 03:07:41 -07:00
commit df21974730
6 changed files with 203 additions and 18 deletions

View File

@ -55,8 +55,10 @@ type Runtime interface {
// Version returns the version information of the container runtime.
Version() (Version, error)
// APIVersion returns the API version information of the container
// runtime. This may be different from the runtime engine's version.
// APIVersion returns the cached API version information of the container
// runtime. Implementation is expected to update this cache periodically.
// This may be different from the runtime engine's version.
// TODO(random-liu): We should fold this into Version()
APIVersion() (Version, error)
// Status returns error if the runtime is unhealthy; nil otherwise.

View File

@ -402,6 +402,8 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A
}
func (f *FakeDockerClient) Version() (*docker.Env, error) {
f.Lock()
defer f.Unlock()
return &f.VersionInfo, f.popError("version")
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/oom"
@ -52,6 +53,9 @@ func NewFakeDockerManager(
burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true)
dm.dockerPuller = &FakeDockerPuller{}
dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) {
return dm.getVersionInfo()
})
return dm
}

View File

@ -46,6 +46,7 @@ import (
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
@ -62,6 +63,8 @@ const (
minimumDockerAPIVersion = "1.20"
dockerv110APIVersion = "1.21"
// ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified)
// we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative.
// hence, setting ndots to be 5.
@ -156,6 +159,9 @@ type DockerManager struct {
// A false value means the kubelet just backs off from setting it,
// it might already be true.
configureHairpinMode bool
// The api version cache of docker daemon.
versionCache *cache.VersionCache
}
// A subset of the pod.Manager interface extracted for testing purposes.
@ -247,6 +253,15 @@ func NewDockerManager(
optf(dm)
}
// initialize versionCache with a updater
dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) {
return dm.getVersionInfo()
})
// update version cache periodically.
if dm.machineInfo != nil {
dm.versionCache.UpdateCachePeriodly(dm.machineInfo.MachineID)
}
return dm
}
@ -503,7 +518,8 @@ func (dm *DockerManager) runContainer(
ipcMode string,
utsMode string,
pidMode string,
restartCount int) (kubecontainer.ContainerID, error) {
restartCount int,
oomScoreAdj int) (kubecontainer.ContainerID, error) {
dockerName := KubeletContainerName{
PodFullName: kubecontainer.GetPodFullName(pod),
@ -584,6 +600,14 @@ func (dm *DockerManager) runContainer(
SecurityOpt: securityOpts,
}
// If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig
result, err := dm.checkDockerAPIVersion(dockerv110APIVersion)
if err != nil {
glog.Errorf("Failed to check docker api version: %v", err)
} else if result >= 0 {
hc.OomScoreAdj = oomScoreAdj
}
if dm.cpuCFSQuota {
// if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource.
cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue())
@ -1429,17 +1453,7 @@ func (dm *DockerManager) applyOOMScoreAdj(container *api.Container, containerInf
}
return err
}
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
// TODO: Cache this value.
var oomScoreAdj int
if containerInfo.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))
}
oomScoreAdj := dm.calculateOomScoreAdj(container)
if err = dm.oomAdjuster.ApplyOOMScoreAdjContainer(cgroupName, oomScoreAdj, 5); err != nil {
if err == os.ErrNotExist {
// Container exited. We cannot do anything about it. Ignore this error.
@ -1473,7 +1487,10 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
if usesHostNetwork(pod) {
utsMode = namespaceModeHost
}
id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount)
oomScoreAdj := dm.calculateOomScoreAdj(container)
id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount, oomScoreAdj)
if err != nil {
return kubecontainer.ContainerID{}, fmt.Errorf("runContainer: %v", err)
}
@ -1512,9 +1529,12 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
return kubecontainer.ContainerID{}, fmt.Errorf("can't get init PID for container %q", id)
}
if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil {
return kubecontainer.ContainerID{}, fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name)
// Check if current docker version is higher than 1.10. Otherwise, we have to apply OOMScoreAdj instead of using docker API.
err = dm.applyOOMScoreAdjIfNeeded(container, containerInfo)
if err != nil {
return kubecontainer.ContainerID{}, err
}
// The addNDotsOption call appends the ndots option to the resolv.conf file generated by docker.
// This resolv.conf file is shared by all containers of the same pod, and needs to be modified only once per pod.
// we modify it when the pause container is created since it is the first container created in the pod since it holds
@ -1529,6 +1549,69 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
return id, err
}
func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error {
// Compare current API version with expected api version.
result, err := dm.checkDockerAPIVersion(dockerv110APIVersion)
if err != nil {
return fmt.Errorf("Failed to check docker api version: %v", err)
}
// If current api version is older than OOMScoreAdj requested, use the old way.
if result < 0 {
if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil {
return fmt.Errorf("Failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name)
}
}
return nil
}
func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int {
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
var oomScoreAdj int
if container.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))
}
return oomScoreAdj
}
// getCachedVersionInfo gets cached version info of docker runtime.
func (dm *DockerManager) getCachedVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
apiVersion, daemonVersion, err := dm.versionCache.Get(dm.machineInfo.MachineID)
if err != nil {
glog.Errorf("Failed to get cached docker api version %v ", err)
}
// If we got nil versions, try to update version info.
if apiVersion == nil || daemonVersion == nil {
dm.versionCache.Update(dm.machineInfo.MachineID)
}
return apiVersion, daemonVersion, err
}
// checkDockerAPIVersion checks current docker API version against expected version.
// Return:
// 1 : newer than expected version
// -1: older than expected version
// 0 : same version
func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) {
apiVersion, _, err := dm.getCachedVersionInfo()
if err != nil {
return 0, err
}
result, err := apiVersion.Compare(expectedVersion)
if err != nil {
return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v",
apiVersion, expectedVersion, err)
}
return result, nil
}
func addNDotsOption(resolvFilePath string) error {
if len(resolvFilePath) == 0 {
glog.Errorf("ResolvConfPath is empty.")
@ -2112,3 +2195,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k
podStatus.ContainerStatuses = containerStatuses
return podStatus, nil
}
// getVersionInfo returns apiVersion & daemonVersion of docker runtime
func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) {
apiVersion, err := dm.APIVersion()
if err != nil {
return nil, nil, err
}
daemonVersion, err := dm.Version()
if err != nil {
return nil, nil, err
}
return apiVersion, daemonVersion, nil
}

View File

@ -1933,7 +1933,7 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) {
},
}})
fakeDocker.Errors = map[string]error{"inspect": &docker.NoSuchContainer{}}
fakeDocker.InjectErrors(map[string]error{"inspect": &docker.NoSuchContainer{}})
runSyncPod(t, dm, fakeDocker, pod, nil, false)
// Verify that we will try to start new contrainers even if the inspections

80
pkg/kubelet/util/cache/version_cache.go vendored Normal file
View File

@ -0,0 +1,80 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/wait"
)
type VersionCache struct {
lock sync.RWMutex
cache map[string]versionInfo
updater func() (kubecontainer.Version, kubecontainer.Version, error)
}
// versionInfo caches api version and daemon version.
type versionInfo struct {
apiVersion kubecontainer.Version
version kubecontainer.Version
}
const maxVersionCacheEntries = 1000
func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache {
return &VersionCache{
cache: map[string]versionInfo{},
updater: f,
}
}
// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key.
func (c *VersionCache) Update(key string) {
apiVersion, daemonVersion, err := c.updater()
if err != nil {
glog.Errorf("Fail to get version info from container runtime: %v", err)
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.cache[key] = versionInfo{apiVersion, daemonVersion}
}
}
// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key.
// It returns apiVersion first and followed by daemon version.
func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.cache[key]
if !ok {
return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key)
}
return value.apiVersion, value.version, nil
}
func (c *VersionCache) UpdateCachePeriodly(key string) {
go wait.Until(func() {
c.Update(key)
}, 1*time.Minute, wait.NeverStop)
}