diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 73984f70d32..3d56a6af2d7 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/oom" @@ -52,6 +53,14 @@ func NewFakeDockerManager( burst, containerLogsDir, osInterface, networkPlugin, runtimeHelper, httpClient, &NativeExecHandler{}, fakeOOMAdjuster, fakeProcFs, false, imageBackOff, false, false, true) dm.dockerPuller = &FakeDockerPuller{} + + // ttl of version cache is set to 0 so we always call version api directly in tests. + dm.versionCache = cache.NewObjectCache( + func() (interface{}, error) { + return dm.getVersionInfo() + }, + 0, + ) return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 07c98da9ae7..9fc47030c95 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -87,6 +87,9 @@ const ( // Remote API version for docker daemon version v1.10 // https://docs.docker.com/engine/reference/api/docker_remote_api/ dockerV110APIVersion = "1.22" + + // The expiration time of version cache. + versionCacheTTL = 60 * time.Second ) var ( @@ -161,11 +164,11 @@ type DockerManager struct { // it might already be true. configureHairpinMode bool - // The api version cache of docker daemon. - versionCache *cache.VersionCache - // Provides image stats *imageStatsProvider + + // The version cache of docker daemon. + versionCache *cache.ObjectCache } // A subset of the pod.Manager interface extracted for testing purposes. @@ -253,6 +256,13 @@ func NewDockerManager( } dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir) + dm.versionCache = cache.NewObjectCache( + func() (interface{}, error) { + return dm.getVersionInfo() + }, + versionCacheTTL, + ) + // apply optional settings.. for _, optf := range options { optf(dm) @@ -1554,16 +1564,24 @@ func (dm *DockerManager) calculateOomScoreAdj(container *api.Container) int { return oomScoreAdj } +// versionInfo wraps api version and daemon version. +type versionInfo struct { + apiVersion kubecontainer.Version + daemonVersion kubecontainer.Version +} + // checkDockerAPIVersion checks current docker API version against expected version. // Return: // 1 : newer than expected version // -1: older than expected version // 0 : same version func (dm *DockerManager) checkDockerAPIVersion(expectedVersion string) (int, error) { - apiVersion, _, err := dm.getVersionInfo() + + value, err := dm.versionCache.Get(dm.machineInfo.MachineID) if err != nil { return 0, err } + apiVersion := value.(versionInfo).apiVersion result, err := apiVersion.Compare(expectedVersion) if err != nil { return 0, fmt.Errorf("Failed to compare current docker api version %v with OOMScoreAdj supported Docker version %q - %v", @@ -2156,15 +2174,17 @@ func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*k } // getVersionInfo returns apiVersion & daemonVersion of docker runtime -func (dm *DockerManager) getVersionInfo() (kubecontainer.Version, kubecontainer.Version, error) { +func (dm *DockerManager) getVersionInfo() (versionInfo, error) { apiVersion, err := dm.APIVersion() if err != nil { - return nil, nil, err + return versionInfo{}, err } daemonVersion, err := dm.Version() if err != nil { - return nil, nil, err + return versionInfo{}, err } - - return apiVersion, daemonVersion, nil + return versionInfo{ + apiVersion: apiVersion, + daemonVersion: daemonVersion, + }, nil } diff --git a/pkg/kubelet/util/cache/object_cache.go b/pkg/kubelet/util/cache/object_cache.go new file mode 100644 index 00000000000..9bb809c0d06 --- /dev/null +++ b/pkg/kubelet/util/cache/object_cache.go @@ -0,0 +1,84 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "time" + + expirationCache "k8s.io/kubernetes/pkg/client/cache" +) + +// ObjectCache is a simple wrapper of expiration cache that +// 1. use string type key +// 2. has a updater to get value directly if it is expired +// 3. then update the cache +type ObjectCache struct { + cache expirationCache.Store + updater func() (interface{}, error) +} + +// objectEntry is a object with string type key. +type objectEntry struct { + key string + obj interface{} +} + +// NewObjectCache creates ObjectCache with a updater. +// updater returns a object to cache. +func NewObjectCache(f func() (interface{}, error), ttl time.Duration) *ObjectCache { + return &ObjectCache{ + updater: f, + cache: expirationCache.NewTTLStore(stringKeyFunc, ttl), + } +} + +// stringKeyFunc is a string as cache key function +func stringKeyFunc(obj interface{}) (string, error) { + key := obj.(objectEntry).key + return key, nil +} + +// Get gets cached objectEntry by using a unique string as the key. +func (c *ObjectCache) Get(key string) (interface{}, error) { + value, ok, err := c.cache.Get(objectEntry{key: key}) + if err != nil { + return nil, err + } + if !ok { + obj, err := c.updater() + if err != nil { + return nil, err + } + err = c.cache.Add(objectEntry{ + key: key, + obj: obj, + }) + if err != nil { + return nil, err + } + return obj, nil + } + return value.(objectEntry).obj, nil +} + +func (c *ObjectCache) Add(key string, obj interface{}) error { + err := c.cache.Add(objectEntry{key: key, obj: obj}) + if err != nil { + return err + } + return nil +} diff --git a/pkg/kubelet/util/cache/object_cache_test.go b/pkg/kubelet/util/cache/object_cache_test.go new file mode 100644 index 00000000000..e53ae0df02b --- /dev/null +++ b/pkg/kubelet/util/cache/object_cache_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "testing" + "time" + + expirationCache "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/util" +) + +type testObject struct { + key string + val string +} + +// A fake objectCache for unit test. +func NewFakeObjectCache(f func() (interface{}, error), ttl time.Duration, clock util.Clock) *ObjectCache { + ttlPolicy := &expirationCache.TTLPolicy{Ttl: ttl, Clock: clock} + deleteChan := make(chan string, 1) + return &ObjectCache{ + updater: f, + cache: expirationCache.NewFakeExpirationStore(stringKeyFunc, deleteChan, ttlPolicy, clock), + } +} + +func TestAddAndGet(t *testing.T) { + testObj := testObject{ + key: "foo", + val: "bar", + } + objectCache := NewFakeObjectCache(func() (interface{}, error) { + return nil, fmt.Errorf("Unexpected Error: updater should never be called in this test!") + }, 1*time.Hour, util.NewFakeClock(time.Now())) + + err := objectCache.Add(testObj.key, testObj.val) + if err != nil { + t.Errorf("Unable to add obj %#v by key: %s", testObj, testObj.key) + } + value, err := objectCache.Get(testObj.key) + if err != nil { + t.Errorf("Unable to get obj %#v by key: %s", testObj, testObj.key) + } + if value.(string) != testObj.val { + t.Errorf("Expected to get cached value: %#v, but got: %s", testObj.val, value.(string)) + } + +} + +func TestExpirationBasic(t *testing.T) { + unexpectedVal := "bar" + expectedVal := "bar2" + + testObj := testObject{ + key: "foo", + val: unexpectedVal, + } + + fakeClock := util.NewFakeClock(time.Now()) + + objectCache := NewFakeObjectCache(func() (interface{}, error) { + return expectedVal, nil + }, 1*time.Second, fakeClock) + + err := objectCache.Add(testObj.key, testObj.val) + if err != nil { + t.Errorf("Unable to add obj %#v by key: %s", testObj, testObj.key) + } + + // sleep 2s so cache should be expired. + fakeClock.Sleep(2 * time.Second) + + value, err := objectCache.Get(testObj.key) + if err != nil { + t.Errorf("Unable to get obj %#v by key: %s", testObj, testObj.key) + } + if value.(string) != expectedVal { + t.Errorf("Expected to get cached value: %#v, but got: %s", expectedVal, value.(string)) + } +} diff --git a/pkg/kubelet/util/cache/version_cache.go b/pkg/kubelet/util/cache/version_cache.go deleted file mode 100644 index f3d34e1a1f3..00000000000 --- a/pkg/kubelet/util/cache/version_cache.go +++ /dev/null @@ -1,80 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cache - -import ( - "fmt" - "sync" - "time" - - "github.com/golang/glog" - - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/util/wait" -) - -type VersionCache struct { - lock sync.RWMutex - cache map[string]versionInfo - updater func() (kubecontainer.Version, kubecontainer.Version, error) -} - -// versionInfo caches api version and daemon version. -type versionInfo struct { - apiVersion kubecontainer.Version - version kubecontainer.Version -} - -const maxVersionCacheEntries = 1000 - -func NewVersionCache(f func() (kubecontainer.Version, kubecontainer.Version, error)) *VersionCache { - return &VersionCache{ - cache: map[string]versionInfo{}, - updater: f, - } -} - -// Update updates cached versionInfo by using a unique string (e.g. machineInfo) as the key. -func (c *VersionCache) Update(key string) { - apiVersion, daemonVersion, err := c.updater() - - if err != nil { - glog.Errorf("Fail to get version info from container runtime: %v", err) - } else { - c.lock.Lock() - defer c.lock.Unlock() - c.cache[key] = versionInfo{apiVersion, daemonVersion} - } -} - -// Get gets cached versionInfo by using a unique string (e.g. machineInfo) as the key. -// It returns apiVersion first and followed by daemon version. -func (c *VersionCache) Get(key string) (kubecontainer.Version, kubecontainer.Version, error) { - c.lock.RLock() - defer c.lock.RUnlock() - value, ok := c.cache[key] - if !ok { - return nil, nil, fmt.Errorf("Failed to get version info from cache by key: %v", key) - } - return value.apiVersion, value.version, nil -} - -func (c *VersionCache) UpdateCachePeriodly(key string) { - go wait.Until(func() { - c.Update(key) - }, 1*time.Minute, wait.NeverStop) -}