Add cache for api version

Expose runtime interface
This commit is contained in:
harry 2016-02-26 17:06:26 +08:00 committed by Harry Zhang
parent b4244a079f
commit f9e2f522b4
3 changed files with 125 additions and 29 deletions

View File

@ -55,8 +55,10 @@ type Runtime interface {
// Version returns the version information of the container runtime.
Version() (Version, error)
// APIVersion returns the API version information of the container
// runtime. This may be different from the runtime engine's version.
// APIVersion returns the cached API version information of the container
// runtime. Implementation is expected to update this cache periodically.
// This may be different from the runtime engine's version.
// TODO(random-liu): We should fold this into Version()
APIVersion() (Version, error)
// Status returns error if the runtime is unhealthy; nil otherwise.

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
@ -158,6 +159,9 @@ type DockerManager struct {
// A false value means the kubelet just backs off from setting it,
// it might already be true.
configureHairpinMode bool
// The api version cache of docker daemon.
versionCache *VersionCache
}
// A subset of the pod.Manager interface extracted for testing purposes.
@ -235,6 +239,7 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota,
enableCustomMetrics: enableCustomMetrics,
configureHairpinMode: hairpinMode,
versionCache: NewVersionCache(),
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
if serializeImagePulls {
@ -249,6 +254,21 @@ func NewDockerManager(
optf(dm)
}
apiVersion, err := dm.APIVersion()
if err != nil {
glog.Errorf("Failed to get api version from docker %v", err)
}
daemonVersion, err := dm.Version()
if err != nil {
glog.Errorf("Failed to get daemon version from docker %v", err)
}
// Update version cache periodically
go wait.Until(func() {
dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, daemonVersion)
}, 5*time.Second, wait.NeverStop)
return dm
}
@ -1437,17 +1457,7 @@ func (dm *DockerManager) applyOOMScoreAdj(container *api.Container, containerInf
}
return err
}
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
// TODO: Cache this value.
var oomScoreAdj int
if containerInfo.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))
}
oomScoreAdj := dm.calculateOomScoreAdj(container)
if err = dm.oomAdjuster.ApplyOOMScoreAdjContainer(cgroupName, oomScoreAdj, 5); err != nil {
if err == os.ErrNotExist {
// Container exited. We cannot do anything about it. Ignore this error.
@ -1482,17 +1492,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
utsMode = namespaceModeHost
}
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
var oomScoreAdj int
if container.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))
}
oomScoreAdj := dm.calculateOomScoreAdj(container)
id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode, pidMode, restartCount, oomScoreAdj)
if err != nil {
@ -1566,20 +1566,52 @@ func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, cont
return nil
}
// Check current docker API version against expected version.
func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int {
// Set OOM score of the container based on the priority of the container.
// Processes in lower-priority pods should be killed first if the system runs out of memory.
// The main pod infrastructure container is considered high priority, since if it is killed the
// whole pod will die.
var oomScoreAdj int
if container.Name == PodInfraContainerName {
oomScoreAdj = qos.PodInfraOOMAdj
} else {
oomScoreAdj = qos.GetContainerOOMScoreAdjust(container, int64(dm.machineInfo.MemoryCapacity))
}
return oomScoreAdj
}
// getCachedApiVersion gets cached api version of docker runtime.
func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) {
apiVersion, _, err := dm.versionCache.Get(dm.machineInfo.MachineID)
if err != nil {
glog.Errorf("Failed to get cached docker api version %v ", err)
}
// If we got nil apiVersion, try to get api version directly.
if apiVersion == nil {
apiVersion, err = dm.APIVersion()
if err != nil {
glog.Errorf("Failed to get docker api version directly %v ", err)
}
dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, nil)
}
return apiVersion, err
}
// checkDockerAPIVersion checks current docker API version against expected version.
// Return:
// 1 : newer than expected version
// -1: older than expected version
// 0 : same version
func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) int {
apiVersion, err := dm.APIVersion()
apiVersion, err := dm.getCachedApiVersion()
if err != nil {
glog.Errorf("failed to get current docker version - %v", err)
glog.Errorf("Failed to get cached docker api version %v ", err)
}
result, err := apiVersion.Compare(expectedVersion)
if err != nil {
glog.Errorf("failed to compare current docker version %v with OOMScoreAdj supported Docker version %q - %v",
glog.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v",
apiVersion, expectedVersion, err)
}
return result

View File

@ -0,0 +1,62 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"fmt"
"sync"
"github.com/golang/groupcache/lru"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
type VersionCache struct {
lock sync.RWMutex
cache *lru.Cache
}
// versionInfo caches api version and daemon version.
type versionInfo struct {
apiVersion kubecontainer.Version
version kubecontainer.Version
}
const maxVersionCacheEntries = 1000
func NewVersionCache() *VersionCache {
return &VersionCache{cache: lru.New(maxVersionCacheEntries)}
}
// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key.
func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, version kubecontainer.Version) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Add(key, versionInfo{apiVersion, version})
}
// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key.
// It returns apiVersion first and followed by daemon version.
func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.cache.Get(key)
if !ok {
return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key)
}
versions := value.(versionInfo)
return versions.apiVersion, versions.version, nil
}