From 4f549aae54ef40f7d9329ca0f75bcfc484505142 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 18 Jun 2014 15:18:38 -0700 Subject: [PATCH] Add a PodCache that is responsible for caching pod information. Not wired in yet. --- pkg/master/pod_cache.go | 99 +++++++++++++++++++++++++ pkg/master/pod_cache_test.go | 118 ++++++++++++++++++++++++++++++ pkg/registry/mock_registry.go | 62 ++++++++++++++++ pkg/registry/pod_registry_test.go | 34 --------- 4 files changed, 279 insertions(+), 34 deletions(-) create mode 100644 pkg/master/pod_cache.go create mode 100644 pkg/master/pod_cache_test.go create mode 100644 pkg/registry/mock_registry.go diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go new file mode 100644 index 00000000000..2b4c7cc2555 --- /dev/null +++ b/pkg/master/pod_cache.go @@ -0,0 +1,99 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// PodCache contains both a cache of container information, as well as the mechanism for keeping +// that cache up to date. +type PodCache struct { + containerInfo client.ContainerInfo + pods registry.PodRegistry + podInfo map[string]interface{} + period time.Duration + podLock sync.Mutex +} + +func NewPodCache(info client.ContainerInfo, pods registry.PodRegistry, period time.Duration) *PodCache { + return &PodCache{ + containerInfo: info, + pods: pods, + podInfo: map[string]interface{}{}, + period: period, + } +} + +// Implements the ContainerInfo interface +// The returned value should be treated as read-only +func (p *PodCache) GetContainerInfo(host, id string) (interface{}, error) { + p.podLock.Lock() + defer p.podLock.Unlock() + value, ok := p.podInfo[id] + if !ok { + return nil, fmt.Errorf("Couldn't find any information for %s", id) + } else { + return value, nil + } +} + +func (p *PodCache) updateContainerInfo(host, id string) error { + info, err := p.containerInfo.GetContainerInfo(host, id) + if err != nil { + return err + } + p.podLock.Lock() + defer p.podLock.Unlock() + p.podInfo[id] = info + return nil +} + +// Update information about all containers. Either called by Loop() below, or one-off. +func (p *PodCache) UpdateAllContainers() { + pods, err := p.pods.ListPods(labels.Everything()) + if err != nil { + log.Printf("Error synchronizing container: %#v", err) + return + } + for _, pod := range pods { + err := p.updateContainerInfo(pod.CurrentState.Host, pod.ID) + if err != nil { + log.Printf("Error synchronizing container: %#v", err) + } + } +} + +func (p *PodCache) synchronizeContainers() { + ticker := time.Tick(p.period) + for _ = range ticker { + p.UpdateAllContainers() + } +} + +// Loop runs forever, it is expected to be placed in a go routine. +func (p *PodCache) Loop() { + util.Forever(func() { p.synchronizeContainers() }, 0) +} diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go new file mode 100644 index 00000000000..e622ee81a09 --- /dev/null +++ b/pkg/master/pod_cache_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "reflect" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" +) + +type FakeContainerInfo struct { + host string + id string + data interface{} + err error +} + +func (f *FakeContainerInfo) GetContainerInfo(host, id string) (interface{}, error) { + f.host = host + f.id = id + return f.data, f.err +} + +func TestPodCacheGet(t *testing.T) { + cache := NewPodCache(nil, nil, time.Second*1) + + pod := api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + } + cache.podInfo["foo"] = pod + + info, err := cache.GetContainerInfo("host", "foo") + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if !reflect.DeepEqual(info, pod) { + t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) + } +} + +func TestPodCacheGetMissing(t *testing.T) { + cache := NewPodCache(nil, nil, time.Second*1) + + _, err := cache.GetContainerInfo("host", "foo") + if err == nil { + t.Errorf("Unexpected non-error") + } +} + +func TestPodGetContainerInfo(t *testing.T) { + pod := api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + } + fake := FakeContainerInfo{ + data: pod, + } + cache := NewPodCache(&fake, nil, time.Second*1) + + cache.updateContainerInfo("host", "foo") + + if fake.host != "host" || fake.id != "foo" { + t.Errorf("Unexpected access: %#v", fake) + } + + info, err := cache.GetContainerInfo("host", "foo") + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if !reflect.DeepEqual(info, pod) { + t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) + } +} + +func TestPodUpdateAllContainers(t *testing.T) { + pod := api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{ + Host: "machine", + }, + } + pods := []api.Pod{pod} + mockRegistry := registry.MakeMockPodRegistry(pods) + fake := FakeContainerInfo{ + data: pod, + } + cache := NewPodCache(&fake, mockRegistry, time.Second*1) + + cache.UpdateAllContainers() + + if fake.host != "machine" || fake.id != "foo" { + t.Errorf("Unexpected access: %#v", fake) + } + + info, err := cache.GetContainerInfo("machine", "foo") + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if !reflect.DeepEqual(info, pod) { + t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) + } +} diff --git a/pkg/registry/mock_registry.go b/pkg/registry/mock_registry.go new file mode 100644 index 00000000000..b2e1bff59b9 --- /dev/null +++ b/pkg/registry/mock_registry.go @@ -0,0 +1,62 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type MockPodRegistry struct { + err error + pod *api.Pod + pods []api.Pod +} + +func MakeMockPodRegistry(pods []api.Pod) *MockPodRegistry { + return &MockPodRegistry{ + pods: pods, + } +} + +func (registry *MockPodRegistry) ListPods(query labels.Query) ([]api.Pod, error) { + if registry.err != nil { + return registry.pods, registry.err + } + var filtered []api.Pod + for _, pod := range registry.pods { + if query.Matches(labels.Set(pod.Labels)) { + filtered = append(filtered, pod) + } + } + return filtered, nil +} + +func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) { + return registry.pod, registry.err +} + +func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error { + return registry.err +} + +func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error { + return registry.err +} +func (registry *MockPodRegistry) DeletePod(podId string) error { + return registry.err +} diff --git a/pkg/registry/pod_registry_test.go b/pkg/registry/pod_registry_test.go index 4c29a322d8d..783bbe1c905 100644 --- a/pkg/registry/pod_registry_test.go +++ b/pkg/registry/pod_registry_test.go @@ -26,46 +26,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) -type MockPodRegistry struct { - err error - pod *api.Pod - pods []api.Pod -} - func expectNoError(t *testing.T, err error) { if err != nil { t.Errorf("Unexpected error: %#v", err) } } -func (registry *MockPodRegistry) ListPods(query labels.Query) ([]api.Pod, error) { - if registry.err != nil { - return registry.pods, registry.err - } - var filtered []api.Pod - for _, pod := range registry.pods { - if query.Matches(labels.Set(pod.Labels)) { - filtered = append(filtered, pod) - } - } - return filtered, nil -} - -func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) { - return registry.pod, registry.err -} - -func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error { - return registry.err -} - -func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error { - return registry.err -} -func (registry *MockPodRegistry) DeletePod(podId string) error { - return registry.err -} - func TestListPodsError(t *testing.T) { mockRegistry := MockPodRegistry{ err: fmt.Errorf("test error"),