From 487d34e409a96639e29b8977928c8bd2d4b47ec8 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Wed, 11 Mar 2015 14:55:06 -0700 Subject: [PATCH] kubelet: add container runtime cache and fake runtime. --- pkg/kubelet/container/fake_runtime.go | 203 +++++++++++++++++++++++++ pkg/kubelet/container/runtime_cache.go | 123 +++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 pkg/kubelet/container/fake_runtime.go create mode 100644 pkg/kubelet/container/runtime_cache.go diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go new file mode 100644 index 00000000000..ffdf4c8c92d --- /dev/null +++ b/pkg/kubelet/container/fake_runtime.go @@ -0,0 +1,203 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" +) + +// FakeRuntime is a fake container runtime for testing. +type FakeRuntime struct { + sync.Mutex + CalledFunctions []string + Podlist []*Pod + ContainerList []*Container + PodStatus api.PodStatus + StartedPods []string + KilledPods []string + StartedContainers []string + KilledContainers []string + VersionInfo map[string]string + Err error +} + +type FakeRuntimeCache struct { + runtime Runtime +} + +func NewFakeRuntimeCache(runtime Runtime) RuntimeCache { + return &FakeRuntimeCache{runtime} +} + +func (f *FakeRuntimeCache) GetPods() ([]*Pod, error) { + return f.runtime.GetPods(false) +} + +func (f *FakeRuntimeCache) ForceUpdateIfOlder(time.Time) error { + return nil +} + +// ClearCalls resets the FakeRuntime to the initial state. +func (f *FakeRuntime) ClearCalls() { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = []string{} + f.Podlist = []*Pod{} + f.ContainerList = []*Container{} + f.PodStatus = api.PodStatus{} + f.StartedPods = []string{} + f.KilledPods = []string{} + f.StartedContainers = []string{} + f.KilledContainers = []string{} + f.VersionInfo = map[string]string{} + f.Err = nil +} + +func (f *FakeRuntime) assertList(expect []string, test []string) error { + if !reflect.DeepEqual(expect, test) { + return fmt.Errorf("expected %#v, got %#v", expect, test) + } + return nil +} + +// AssertCalls test if the invoked functions are as expected. +func (f *FakeRuntime) AssertCalls(calls []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(calls, f.CalledFunctions) +} + +func (f *FakeRuntime) AssertStartedPods(pods []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(pods, f.StartedPods) +} + +func (f *FakeRuntime) AssertKilledPods(pods []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(pods, f.KilledPods) +} + +func (f *FakeRuntime) AssertStartedContainers(containers []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(containers, f.StartedContainers) +} + +func (f *FakeRuntime) AssertKilledContainers(containers []string) error { + f.Lock() + defer f.Unlock() + return f.assertList(containers, f.KilledContainers) +} + +func (f *FakeRuntime) Version() (map[string]string, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "Version") + return f.VersionInfo, f.Err +} + +func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetPods") + return f.Podlist, f.Err +} + +func (f *FakeRuntime) RunPod(pod *api.Pod, volumeMap map[string]volume.Interface) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "RunPod") + f.StartedPods = append(f.StartedPods, string(pod.UID)) + for _, c := range pod.Spec.Containers { + f.StartedContainers = append(f.StartedContainers, c.Name) + } + return f.Err +} + +func (f *FakeRuntime) KillPod(pod *api.Pod) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "KillPod") + f.KilledPods = append(f.KilledPods, string(pod.UID)) + for _, c := range pod.Spec.Containers { + f.KilledContainers = append(f.KilledContainers, c.Name) + } + return f.Err +} + +func (f *FakeRuntime) RunContainerInPod(container api.Container, pod *api.Pod, volumeMap map[string]volume.Interface) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "RunContainerInPod") + f.StartedContainers = append(f.StartedContainers, container.Name) + + pod.Spec.Containers = append(pod.Spec.Containers, container) + for _, c := range pod.Spec.Containers { + if c.Name == container.Name { // Container already in the pod. + return f.Err + } + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + return f.Err +} + +func (f *FakeRuntime) KillContainerInPod(container api.Container, pod *api.Pod) error { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "KillContainerInPod") + f.KilledContainers = append(f.KilledContainers, container.Name) + + var containers []api.Container + for _, c := range pod.Spec.Containers { + if c.Name == container.Name { + continue + } + containers = append(containers, c) + } + return f.Err +} + +func (f *FakeRuntime) GetPodStatus(pod *Pod) (api.PodStatus, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetPodStatus") + return f.PodStatus, f.Err +} + +func (f *FakeRuntime) GetContainers(all bool) ([]*Container, error) { + f.Lock() + defer f.Unlock() + + f.CalledFunctions = append(f.CalledFunctions, "GetContainers") + return f.ContainerList, f.Err +} diff --git a/pkg/kubelet/container/runtime_cache.go b/pkg/kubelet/container/runtime_cache.go new file mode 100644 index 00000000000..6bda534df56 --- /dev/null +++ b/pkg/kubelet/container/runtime_cache.go @@ -0,0 +1,123 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "sync" + "time" +) + +var ( + // TODO(yifan): Maybe set the them as parameters for NewCache(). + defaultCachePeriod = time.Second * 2 + defaultUpdateInterval = time.Millisecond * 100 +) + +type RuntimeCache interface { + GetPods() ([]*Pod, error) + ForceUpdateIfOlder(time.Time) error +} + +// NewRuntimeCache creates a container runtime cache. +func NewRuntimeCache(runtime Runtime) (RuntimeCache, error) { + pods, err := runtime.GetPods(false) + if err != nil { + return nil, err + } + return &runtimeCache{ + runtime: runtime, + cacheTime: time.Now(), + pods: pods, + updating: false, + }, nil +} + +type runtimeCache struct { + sync.Mutex + // The underlying container runtime used to update the cache. + runtime Runtime + // Last time when cache was updated. + cacheTime time.Time + // The content of the cache. + pods []*Pod + // Whether the background thread updating the cache is running. + updating bool + // Time when the background thread should be stopped. + updatingThreadStopTime time.Time +} + +// GetPods returns the cached result for ListPods if the result is not +// outdated, otherwise it will retrieve the newest result. +// If the cache updating loop has stopped, this function will restart it. +func (r *runtimeCache) GetPods() ([]*Pod, error) { + r.Lock() + defer r.Unlock() + if time.Since(r.cacheTime) > defaultCachePeriod { + if err := r.updateCache(); err != nil { + return nil, err + } + } + // Stop refreshing thread if there were no requests within the default cache period + r.updatingThreadStopTime = time.Now().Add(defaultCachePeriod) + if !r.updating { + r.updating = true + go r.startUpdatingCache() + } + return r.pods, nil +} + +func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { + r.Lock() + defer r.Unlock() + if r.cacheTime.Before(minExpectedCacheTime) { + return r.updateCache() + } + return nil +} + +func (r *runtimeCache) updateCache() error { + pods, err := r.runtime.GetPods(false) + if err != nil { + return err + } + r.pods = pods + r.cacheTime = time.Now() + return nil +} + +// startUpdateingCache continues to invoke GetPods to get the newest result until +// there is no requests within the default cache period. +func (r *runtimeCache) startUpdatingCache() { + run := true + for run { + time.Sleep(defaultUpdateInterval) + pods, err := r.runtime.GetPods(false) + cacheTime := time.Now() + if err != nil { + continue + } + + r.Lock() + if time.Now().After(r.updatingThreadStopTime) { + r.updating = false + run = false + } + r.pods = pods + r.cacheTime = cacheTime + r.Unlock() + } +}