From c31ec5607ac9a47c4ddf9ecbaa4c4c5bc4857f84 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Mon, 14 Mar 2016 16:35:49 +0800 Subject: [PATCH] Refactor version cache into kubelet util --- pkg/kubelet/dockertools/fake_docker_client.go | 2 + pkg/kubelet/dockertools/fake_manager.go | 4 + pkg/kubelet/dockertools/manager.go | 81 ++++++++++--------- pkg/kubelet/dockertools/manager_test.go | 2 +- .../cache}/version_cache.go | 44 +++++++--- 5 files changed, 83 insertions(+), 50 deletions(-) rename pkg/kubelet/{dockertools => util/cache}/version_cache.go (60%) diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 1a905cb90be..742cc6368b4 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -402,6 +402,8 @@ func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.A } func (f *FakeDockerClient) Version() (*docker.Env, error) { + f.Lock() + defer f.Unlock() return &f.VersionInfo, f.popError("version") } diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 73984f70d32..28fcffcd41e 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/oom" @@ -52,6 +53,9 @@ func NewFakeDockerManager( burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{}, fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true) dm.dockerPuller = &FakeDockerPuller{} + dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) { + return dm.getVersionInfo() + }) return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index f97c56bba5c..6b9699ba708 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -46,6 +46,7 @@ import ( proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/qos" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" @@ -55,7 +56,6 @@ import ( "k8s.io/kubernetes/pkg/util/procfs" utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilstrings "k8s.io/kubernetes/pkg/util/strings" - "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -161,7 +161,7 @@ type DockerManager struct { configureHairpinMode bool // The api version cache of docker daemon. - versionCache *VersionCache + versionCache *cache.VersionCache } // A subset of the pod.Manager interface extracted for testing purposes. @@ -239,7 +239,6 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, enableCustomMetrics: enableCustomMetrics, configureHairpinMode: hairpinMode, - versionCache: NewVersionCache(), } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) if serializeImagePulls { @@ -254,21 +253,15 @@ func NewDockerManager( optf(dm) } - apiVersion, err := dm.APIVersion() - if err != nil { - glog.Errorf("Failed to get api version from docker %v", err) + // initialize versionCache with a updater + dm.versionCache = cache.NewVersionCache(func() (kubecontainer.Version, kubecontainer.Version, error) { + return dm.getVersionInfo() + }) + // update version cache periodically. + if dm.machineInfo != nil { + dm.versionCache.UpdateCachePeriodly(dm.machineInfo.MachineID) } - daemonVersion, err := dm.Version() - if err != nil { - glog.Errorf("Failed to get daemon version from docker %v", err) - } - - // Update version cache periodically - go wait.Until(func() { - dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, daemonVersion) - }, 5*time.Second, wait.NeverStop) - return dm } @@ -608,7 +601,10 @@ func (dm *DockerManager) runContainer( } // If current api version is newer than docker 1.10 requested, set OomScoreAdj to HostConfig - if dm.checkDockerAPIVersion(dockerv110APIVersion) >= 0 { + result, err := dm.checkDockerAPIVersion(dockerv110APIVersion) + if err != nil { + glog.Errorf("Failed to check docker api version: %v", err) + } else if result >= 0 { hc.OomScoreAdj = oomScoreAdj } @@ -1554,12 +1550,15 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe } func (dm *DockerManager) applyOOMScoreAdjIfNeeded(container *api.Container, containerInfo *docker.Container) error { - // Compare current API version with expected api version - result := dm.checkDockerAPIVersion(dockerv110APIVersion) + // Compare current API version with expected api version. + result, err := dm.checkDockerAPIVersion(dockerv110APIVersion) + if err != nil { + return fmt.Errorf("Failed to check docker api version: %v", err) + } // If current api version is older than OOMScoreAdj requested, use the old way. if result < 0 { if err := dm.applyOOMScoreAdj(container, containerInfo); err != nil { - return fmt.Errorf("failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) + return fmt.Errorf("Failed to apply oom-score-adj to container %q- %v", err, containerInfo.Name) } } @@ -1582,21 +1581,17 @@ func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int { return oomScoreAdj } -// getCachedApiVersion gets cached api version of docker runtime. -func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) { - apiVersion, _, err := dm.versionCache.Get(dm.machineInfo.MachineID) +// getCachedVersionInfo gets cached version info of docker runtime. +func (dm *DockerManager) getCachedVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) { + apiVersion, daemonVersion, err := dm.versionCache.Get(dm.machineInfo.MachineID) if err != nil { glog.Errorf("Failed to get cached docker api version %v ", err) } - // If we got nil apiVersion, try to get api version directly. - if apiVersion == nil { - apiVersion, err = dm.APIVersion() - if err != nil { - glog.Errorf("Failed to get docker api version directly %v ", err) - } - dm.versionCache.Update(dm.machineInfo.MachineID, apiVersion, nil) + // If we got nil versions, try to update version info. + if apiVersion == nil || daemonVersion == nil { + dm.versionCache.Update(dm.machineInfo.MachineID) } - return apiVersion, err + return apiVersion, daemonVersion, err } // checkDockerAPIVersion checks current docker API version against expected version. @@ -1604,17 +1599,17 @@ func (dm *DockerManager) getCachedApiVersion() (kubecontainer.Version, error) { // 1 : newer than expected version // -1: older than expected version // 0 : same version -func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) int { - apiVersion, err := dm.getCachedApiVersion() +func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) { + apiVersion, _, err := dm.getCachedVersionInfo() if err != nil { - glog.Errorf("Failed to get cached docker api version %v ", err) + return 0, err } result, err := apiVersion.Compare(expectedVersion) if err != nil { - glog.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", + return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", apiVersion, expectedVersion, err) } - return result + return result, nil } func addNDotsOption(resolvFilePath string) error { @@ -2200,3 +2195,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k podStatus.ContainerStatuses = containerStatuses return podStatus, nil } + +// getVersionInfo returns apiVersion & daemonVersion of docker runtime +func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) { + apiVersion, err := dm.APIVersion() + if err != nil { + return nil, nil, err + } + daemonVersion, err := dm.Version() + if err != nil { + return nil, nil, err + } + + return apiVersion, daemonVersion, nil +} diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 089e6056498..0bf4997bbdd 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -1933,7 +1933,7 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) { }, }}) - fakeDocker.Errors = map[string]error{"inspect": &docker.NoSuchContainer{}} + fakeDocker.InjectErrors(map[string]error{"inspect": &docker.NoSuchContainer{}}) runSyncPod(t, dm, fakeDocker, pod, nil, false) // Verify that we will try to start new contrainers even if the inspections diff --git a/pkg/kubelet/dockertools/version_cache.go b/pkg/kubelet/util/cache/version_cache.go similarity index 60% rename from pkg/kubelet/dockertools/version_cache.go rename to pkg/kubelet/util/cache/version_cache.go index 278e5e5c507..151889c0574 100644 --- a/pkg/kubelet/dockertools/version_cache.go +++ b/pkg/kubelet/util/cache/version_cache.go @@ -14,19 +14,23 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dockertools +package cache import ( "fmt" "sync" + "time" + + "github.com/golang/glog" - "github.com/golang/groupcache/lru" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/wait" ) type VersionCache struct { - lock sync.RWMutex - cache *lru.Cache + lock sync.RWMutex + cache map[string]versionInfo + updater func() (kubecontainer.Version, kubecontainer.Version, error) } // versionInfo caches api version and daemon version. @@ -37,15 +41,24 @@ type versionInfo struct { const maxVersionCacheEntries = 1000 -func NewVersionCache() *VersionCache { - return &VersionCache{cache: lru.New(maxVersionCacheEntries)} +func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache { + return &VersionCache{ + cache: map[string]versionInfo{}, + updater: f, + } } // Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key. -func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, version kubecontainer.Version) { - c.lock.Lock() - defer c.lock.Unlock() - c.cache.Add(key, versionInfo{apiVersion, version}) +func (c *VersionCache) Update(key string) { + apiVersion, daemonVersion, err := c.updater() + + if err != nil { + glog.Errorf("Fail to get version info from container runtime: %v", err) + } else { + c.lock.Lock() + defer c.lock.Unlock() + c.cache[key] = versionInfo{apiVersion, daemonVersion} + } } // Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key. @@ -53,10 +66,15 @@ func (c *VersionCache) Update(key string, apiVersion kubecontainer.Version, vers func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) { c.lock.RLock() defer c.lock.RUnlock() - value, ok := c.cache.Get(key) + value, ok := c.cache[key] if !ok { return nil, nil, fmt.Errorf("Failed to get version info from cache by key: ", key) } - versions := value.(versionInfo) - return versions.apiVersion, versions.version, nil + return value.apiVersion, value.version, nil +} + +func (c *VersionCache) UpdateCachePeriodly(key string) { + go wait.Until(func() { + c.Update(key) + }, 1*time.Minute, wait.NeverStop) }