From 6e70853e83fa0c552ed0fa572cec9ad392105247 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Jan 2015 14:06:30 -0800 Subject: [PATCH 1/3] genericize ip cache --- pkg/master/ip_cache.go | 44 ++++++---------------- pkg/util/time_cache.go | 75 +++++++++++++++++++++++++++++++++++++ pkg/util/time_cache_test.go | 63 +++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 32 deletions(-) create mode 100644 pkg/util/time_cache.go create mode 100644 pkg/util/time_cache_test.go diff --git a/pkg/master/ip_cache.go b/pkg/master/ip_cache.go index 05a59c647c2..efbc06b2ae5 100644 --- a/pkg/master/ip_cache.go +++ b/pkg/master/ip_cache.go @@ -17,7 +17,6 @@ limitations under the License. package master import ( - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" @@ -26,19 +25,6 @@ import ( "github.com/golang/glog" ) -type ipCacheEntry struct { - ip string - lastUpdate time.Time -} - -type ipCache struct { - clock util.Clock - cloudProvider cloudprovider.Interface - cache map[string]ipCacheEntry - lock sync.Mutex - ttl time.Duration -} - // NewIPCache makes a new ip caching layer, which will get IP addresses from cp, // and use clock for deciding when to re-get an IP address. // Thread-safe. @@ -47,30 +33,24 @@ type ipCache struct { // that could be produced from a template and a type via `go generate`. func NewIPCache(cp cloudprovider.Interface, clock util.Clock, ttl time.Duration) *ipCache { return &ipCache{ - clock: clock, - cloudProvider: cp, - cache: map[string]ipCacheEntry{}, - ttl: ttl, + cache: util.NewTimeCache( + clock, + ttl, + func(host string) util.T { + return getInstanceIPFromCloud(cp, host) + }, + ), } } +type ipCache struct { + cache util.TimeCache +} + // GetInstanceIP returns the IP address of host, from the cache // if possible, otherwise it asks the cloud provider. func (c *ipCache) GetInstanceIP(host string) string { - c.lock.Lock() - defer c.lock.Unlock() - data, ok := c.cache[host] - now := c.clock.Now() - - if !ok || now.Sub(data.lastUpdate) > c.ttl { - ip := getInstanceIPFromCloud(c.cloudProvider, host) - data = ipCacheEntry{ - ip: ip, - lastUpdate: now, - } - c.cache[host] = data - } - return data.ip + return c.cache.Get(host).(string) } func getInstanceIPFromCloud(cloud cloudprovider.Interface, host string) string { diff --git a/pkg/util/time_cache.go b/pkg/util/time_cache.go new file mode 100644 index 00000000000..7a9bf4a9d0b --- /dev/null +++ b/pkg/util/time_cache.go @@ -0,0 +1,75 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" + "time" +) + +// T stands in for any type in TimeCache +// Should make it easy to use this as a template for an autogenerator +// if we ever start doing that. +type T interface{} + +type TimeCache interface { + // Get will fetch an item from the cache if + // it is present and recent enough. + Get(key string) T +} + +type timeCacheEntry struct { + item T + lastUpdate time.Time +} + +type timeCache struct { + clock Clock + fillFunc func(string) T + cache map[string]timeCacheEntry + lock sync.Mutex + ttl time.Duration +} + +// NewTimeCache returns a cache which calls fill to fill its entries, and +// forgets entries after ttl has passed. +func NewTimeCache(clock Clock, ttl time.Duration, fill func(key string) T) TimeCache { + return &timeCache{ + clock: clock, + fillFunc: fill, + cache: map[string]timeCacheEntry{}, + ttl: ttl, + } +} + +// Get returns the value of key from the cache, if it is present +// and recent enough; otherwise, it blocks while it gets the value. +func (c *timeCache) Get(key string) T { + c.lock.Lock() + defer c.lock.Unlock() + data, ok := c.cache[key] + now := c.clock.Now() + + if !ok || now.Sub(data.lastUpdate) > c.ttl { + data = timeCacheEntry{ + item: c.fillFunc(key), + lastUpdate: now, + } + c.cache[key] = data + } + return data.item +} diff --git a/pkg/util/time_cache_test.go b/pkg/util/time_cache_test.go new file mode 100644 index 00000000000..1f42b08dcdd --- /dev/null +++ b/pkg/util/time_cache_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + "time" +) + +func TestCacheExpire(t *testing.T) { + calls := map[string]int{} + ff := func(key string) T { calls[key]++; return key } + clock := &FakeClock{time.Now()} + + c := NewTimeCache(clock, 60*time.Second, ff) + + c.Get("foo") + c.Get("bar") + // This call should hit the cache, so we expect no additional calls + c.Get("foo") + // Advance the clock, this call should miss the cache, so expect one more call. + clock.Time = clock.Time.Add(61 * time.Second) + c.Get("foo") + c.Get("bar") + + if e, a := 2, calls["foo"]; e != a { + t.Errorf("Wrong number of calls for foo: wanted %v, got %v", e, a) + } + if e, a := 2, calls["bar"]; e != a { + t.Errorf("Wrong number of calls for bar: wanted %v, got %v", e, a) + } +} + +func TestCacheNotExpire(t *testing.T) { + calls := map[string]int{} + ff := func(key string) T { calls[key]++; return key } + clock := &FakeClock{time.Now()} + + c := NewTimeCache(clock, 60*time.Second, ff) + + c.Get("foo") + // This call should hit the cache, so we expect no additional calls to the cloud + clock.Time = clock.Time.Add(60 * time.Second) + c.Get("foo") + + if e, a := 1, calls["foo"]; e != a { + t.Errorf("Wrong number of calls for foo: wanted %v, got %v", e, a) + } +} From f0b5fba988448938ae5446b0b833e848e7df2547 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Jan 2015 15:33:58 -0800 Subject: [PATCH 2/3] Improve cache behavior * don't block while filling cache * make cache wait for pending calls --- pkg/util/time_cache.go | 72 +++++++++++++++++++++++++++++++------ pkg/util/time_cache_test.go | 54 ++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 11 deletions(-) diff --git a/pkg/util/time_cache.go b/pkg/util/time_cache.go index 7a9bf4a9d0b..bcb06231bc2 100644 --- a/pkg/util/time_cache.go +++ b/pkg/util/time_cache.go @@ -40,9 +40,13 @@ type timeCacheEntry struct { type timeCache struct { clock Clock fillFunc func(string) T - cache map[string]timeCacheEntry - lock sync.Mutex ttl time.Duration + + inFlight map[string]chan T + inFlightLock sync.Mutex + + cache map[string]timeCacheEntry + lock sync.RWMutex } // NewTimeCache returns a cache which calls fill to fill its entries, and @@ -51,6 +55,7 @@ func NewTimeCache(clock Clock, ttl time.Duration, fill func(key string) T) TimeC return &timeCache{ clock: clock, fillFunc: fill, + inFlight: map[string]chan T{}, cache: map[string]timeCacheEntry{}, ttl: ttl, } @@ -59,17 +64,62 @@ func NewTimeCache(clock Clock, ttl time.Duration, fill func(key string) T) TimeC // Get returns the value of key from the cache, if it is present // and recent enough; otherwise, it blocks while it gets the value. func (c *timeCache) Get(key string) T { - c.lock.Lock() - defer c.lock.Unlock() + if item, ok := c.get(key); ok { + return item + } + + // We need to fill the cache. Calling the function could be + // expensive, so do it while unlocked. + wait := c.fillOrWait(key) + item := <-wait + + // Put it back in the channel in case there's multiple waiters + // (this channel is non-blocking) + wait <- item + return item +} + +// returns the item and true if it is found and not expired, otherwise nil and false. +func (c *timeCache) get(key string) (T, bool) { + c.lock.RLock() + defer c.lock.RUnlock() data, ok := c.cache[key] now := c.clock.Now() - if !ok || now.Sub(data.lastUpdate) > c.ttl { - data = timeCacheEntry{ - item: c.fillFunc(key), - lastUpdate: now, - } - c.cache[key] = data + return nil, false } - return data.item + return data.item, true +} + +func (c *timeCache) fillOrWait(key string) chan T { + c.inFlightLock.Lock() + defer c.inFlightLock.Unlock() + + // Already a call in progress? + if current, ok := c.inFlight[key]; ok { + return current + } + + // We are the first, so we have to make the call. + result := make(chan T, 1) // non-blocking + c.inFlight[key] = result + go func() { + // Make potentially slow call + data := timeCacheEntry{ + item: c.fillFunc(key), + lastUpdate: c.clock.Now(), + } + result <- data.item + + // Store in cache + c.lock.Lock() + c.cache[key] = data + c.lock.Unlock() + + // Remove in flight entry + c.inFlightLock.Lock() + delete(c.inFlight, key) + c.inFlightLock.Unlock() + }() + return result } diff --git a/pkg/util/time_cache_test.go b/pkg/util/time_cache_test.go index 1f42b08dcdd..7e8f248d68f 100644 --- a/pkg/util/time_cache_test.go +++ b/pkg/util/time_cache_test.go @@ -17,8 +17,11 @@ limitations under the License. package util import ( + "sync" "testing" "time" + + fuzz "github.com/google/gofuzz" ) func TestCacheExpire(t *testing.T) { @@ -61,3 +64,54 @@ func TestCacheNotExpire(t *testing.T) { t.Errorf("Wrong number of calls for foo: wanted %v, got %v", e, a) } } + +func TestCacheParallel(t *testing.T) { + ff := func(key string) T { time.Sleep(time.Second); return key } + clock := &FakeClock{time.Now()} + c := NewTimeCache(clock, 60*time.Second, ff) + + // Make some keys + keys := []string{} + fuzz.New().NilChance(0).NumElements(50, 50).Fuzz(&keys) + + // If we have high parallelism, this will take only a second. + var wg sync.WaitGroup + wg.Add(len(keys)) + for _, key := range keys { + go func(key string) { + c.Get(key) + wg.Done() + }(key) + } + wg.Wait() +} + +func TestCacheParallelOneCall(t *testing.T) { + calls := 0 + var callLock sync.Mutex + ff := func(key string) T { + time.Sleep(time.Second) + callLock.Lock() + defer callLock.Unlock() + calls++ + return key + } + clock := &FakeClock{time.Now()} + c := NewTimeCache(clock, 60*time.Second, ff) + + // If we have high parallelism, this will take only a second. + var wg sync.WaitGroup + wg.Add(50) + for i := 0; i < 50; i++ { + go func(key string) { + c.Get(key) + wg.Done() + }("aoeu") + } + wg.Wait() + + // And if we wait for existing calls, we should have only one call. + if e, a := 1, calls; e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} From 3e75195ff3cddf8cb21eb7e187438e5288014901 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 9 Jan 2015 14:38:31 -0800 Subject: [PATCH 3/3] make minion registry not intolerably slow --- pkg/master/master.go | 8 ++-- pkg/registry/minion/healthy_registry.go | 41 +++++++++++++++----- pkg/registry/minion/healthy_registry_test.go | 22 +++++++---- pkg/registry/minion/rest_test.go | 14 ++++--- 4 files changed, 59 insertions(+), 26 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index c36c8ffed26..9dfe3af3e4e 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -125,6 +125,7 @@ type Master struct { admissionControl admission.Interface masterCount int v1beta3 bool + nodeIPCache IPGetter readOnlyServer string readWriteServer string @@ -256,6 +257,7 @@ func New(c *Config) *Master { authorizer: c.Authorizer, admissionControl: c.AdmissionControl, v1beta3: c.EnableV1Beta3, + nodeIPCache: NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second), masterCount: c.MasterCount, readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), @@ -319,8 +321,9 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) func makeMinionRegistry(c *Config) minion.Registry { var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil) + // TODO: plumb in nodeIPCache here if c.HealthCheckMinions { - minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient) + minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient, util.RealClock{}, 20*time.Second) } return minionRegistry } @@ -331,9 +334,8 @@ func (m *Master) init(c *Config) { var authenticator = c.Authenticator nodeRESTStorage := minion.NewREST(m.minionRegistry) - ipCache := NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second) podCache := NewPodCache( - ipCache, + m.nodeIPCache, c.KubeletClient, RESTStorageToNodes(nodeRESTStorage).Nodes(), m.podRegistry, diff --git a/pkg/registry/minion/healthy_registry.go b/pkg/registry/minion/healthy_registry.go index dfdaa9875fc..3524413ee3c 100644 --- a/pkg/registry/minion/healthy_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -17,23 +17,30 @@ limitations under the License. package minion import ( + "sync" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type HealthyRegistry struct { delegate Registry client client.KubeletHealthChecker + cache util.TimeCache } -func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker) Registry { - return &HealthyRegistry{ +func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker, clock util.Clock, ttl time.Duration) Registry { + h := &HealthyRegistry{ delegate: delegate, client: client, } + h.cache = util.NewTimeCache(clock, ttl, h.doCheck) + return h } func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) { @@ -61,9 +68,17 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Node if err != nil { return nil, err } + + // In case the cache is empty, health check in parallel instead of serially. + var wg sync.WaitGroup + wg.Add(len(list.Items)) for i := range list.Items { - list.Items[i] = *r.checkMinion(&list.Items[i]) + go func(i int) { + list.Items[i] = *r.checkMinion(&list.Items[i]) + wg.Done() + }(i) } + wg.Wait() return list, nil } @@ -81,13 +96,7 @@ func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Sele } func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node { - condition := api.ConditionFull - switch status, err := r.client.HealthCheck(node.Name); { - case err != nil: - condition = api.ConditionUnknown - case status == health.Unhealthy: - condition = api.ConditionNone - } + condition := r.cache.Get(node.Name).(api.NodeConditionStatus) // TODO: distinguish other conditions like Reachable/Live, and begin storing this // data on nodes directly via sync loops. node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ @@ -96,3 +105,15 @@ func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node { }) return node } + +// This is called to fill the cache. +func (r *HealthyRegistry) doCheck(key string) util.T { + switch status, err := r.client.HealthCheck(key); { + case err != nil: + return api.ConditionUnknown + case status == health.Unhealthy: + return api.ConditionNone + default: + return api.ConditionFull + } +} diff --git a/pkg/registry/minion/healthy_registry_test.go b/pkg/registry/minion/healthy_registry_test.go index f7a30ca6d21..ba4ea3bb531 100644 --- a/pkg/registry/minion/healthy_registry_test.go +++ b/pkg/registry/minion/healthy_registry_test.go @@ -19,10 +19,12 @@ package minion import ( "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) type alwaysYes struct{} @@ -34,10 +36,12 @@ func (alwaysYes) HealthCheck(host string) (health.Status, error) { func TestBasicDelegation(t *testing.T) { ctx := api.NewContext() mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) - healthy := HealthyRegistry{ - delegate: mockMinionRegistry, - client: alwaysYes{}, - } + healthy := NewHealthyRegistry( + mockMinionRegistry, + alwaysYes{}, + &util.FakeClock{}, + 60*time.Second, + ) list, err := healthy.ListMinions(ctx) if err != nil { t.Errorf("unexpected error: %v", err) @@ -82,10 +86,12 @@ func (n *notMinion) HealthCheck(host string) (health.Status, error) { func TestFiltering(t *testing.T) { ctx := api.NewContext() mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) - healthy := HealthyRegistry{ - delegate: mockMinionRegistry, - client: ¬Minion{minion: "m1"}, - } + healthy := NewHealthyRegistry( + mockMinionRegistry, + ¬Minion{minion: "m1"}, + &util.FakeClock{}, + 60*time.Second, + ) expected := []string{"m1", "m2", "m3"} list, err := healthy.ListMinions(ctx) if err != nil { diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index 4b4666d06bb..8793ff595bd 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -18,11 +18,13 @@ package minion import ( "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestMinionRegistryREST(t *testing.T) { @@ -89,12 +91,14 @@ func TestMinionRegistryREST(t *testing.T) { func TestMinionRegistryHealthCheck(t *testing.T) { minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{}) - minionHealthRegistry := HealthyRegistry{ - delegate: minionRegistry, - client: ¬Minion{minion: "m1"}, - } + minionHealthRegistry := NewHealthyRegistry( + minionRegistry, + ¬Minion{minion: "m1"}, + &util.FakeClock{}, + 60*time.Second, + ) - ms := NewREST(&minionHealthRegistry) + ms := NewREST(minionHealthRegistry) ctx := api.NewContext() c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "m1"}})