Merge pull request #3374 from lavalamp/fix

Fix minion listing
This commit is contained in:
Dawn Chen 2015-01-12 16:36:46 -08:00
commit 26a6628d8a
7 changed files with 313 additions and 58 deletions

View File

@ -17,7 +17,6 @@ limitations under the License.
package master
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
@ -26,19 +25,6 @@ import (
"github.com/golang/glog"
)
type ipCacheEntry struct {
ip string
lastUpdate time.Time
}
type ipCache struct {
clock util.Clock
cloudProvider cloudprovider.Interface
cache map[string]ipCacheEntry
lock sync.Mutex
ttl time.Duration
}
// NewIPCache makes a new ip caching layer, which will get IP addresses from cp,
// and use clock for deciding when to re-get an IP address.
// Thread-safe.
@ -47,30 +33,24 @@ type ipCache struct {
// that could be produced from a template and a type via `go generate`.
func NewIPCache(cp cloudprovider.Interface, clock util.Clock, ttl time.Duration) *ipCache {
return &ipCache{
clock: clock,
cloudProvider: cp,
cache: map[string]ipCacheEntry{},
ttl: ttl,
cache: util.NewTimeCache(
clock,
ttl,
func(host string) util.T {
return getInstanceIPFromCloud(cp, host)
},
),
}
}
type ipCache struct {
cache util.TimeCache
}
// GetInstanceIP returns the IP address of host, from the cache
// if possible, otherwise it asks the cloud provider.
func (c *ipCache) GetInstanceIP(host string) string {
c.lock.Lock()
defer c.lock.Unlock()
data, ok := c.cache[host]
now := c.clock.Now()
if !ok || now.Sub(data.lastUpdate) > c.ttl {
ip := getInstanceIPFromCloud(c.cloudProvider, host)
data = ipCacheEntry{
ip: ip,
lastUpdate: now,
}
c.cache[host] = data
}
return data.ip
return c.cache.Get(host).(string)
}
func getInstanceIPFromCloud(cloud cloudprovider.Interface, host string) string {

View File

@ -125,6 +125,7 @@ type Master struct {
admissionControl admission.Interface
masterCount int
v1beta3 bool
nodeIPCache IPGetter
readOnlyServer string
readWriteServer string
@ -256,6 +257,7 @@ func New(c *Config) *Master {
authorizer: c.Authorizer,
admissionControl: c.AdmissionControl,
v1beta3: c.EnableV1Beta3,
nodeIPCache: NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second),
masterCount: c.MasterCount,
readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))),
@ -319,8 +321,9 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter)
func makeMinionRegistry(c *Config) minion.Registry {
var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil)
// TODO: plumb in nodeIPCache here
if c.HealthCheckMinions {
minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient)
minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient, util.RealClock{}, 20*time.Second)
}
return minionRegistry
}
@ -331,9 +334,8 @@ func (m *Master) init(c *Config) {
var authenticator = c.Authenticator
nodeRESTStorage := minion.NewREST(m.minionRegistry)
ipCache := NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second)
podCache := NewPodCache(
ipCache,
m.nodeIPCache,
c.KubeletClient,
RESTStorageToNodes(nodeRESTStorage).Nodes(),
m.podRegistry,

View File

@ -17,23 +17,30 @@ limitations under the License.
package minion
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
type HealthyRegistry struct {
delegate Registry
client client.KubeletHealthChecker
cache util.TimeCache
}
func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker) Registry {
return &HealthyRegistry{
func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker, clock util.Clock, ttl time.Duration) Registry {
h := &HealthyRegistry{
delegate: delegate,
client: client,
}
h.cache = util.NewTimeCache(clock, ttl, h.doCheck)
return h
}
func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) {
@ -61,9 +68,17 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Node
if err != nil {
return nil, err
}
// In case the cache is empty, health check in parallel instead of serially.
var wg sync.WaitGroup
wg.Add(len(list.Items))
for i := range list.Items {
list.Items[i] = *r.checkMinion(&list.Items[i])
go func(i int) {
list.Items[i] = *r.checkMinion(&list.Items[i])
wg.Done()
}(i)
}
wg.Wait()
return list, nil
}
@ -81,13 +96,7 @@ func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Sele
}
func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node {
condition := api.ConditionFull
switch status, err := r.client.HealthCheck(node.Name); {
case err != nil:
condition = api.ConditionUnknown
case status == health.Unhealthy:
condition = api.ConditionNone
}
condition := r.cache.Get(node.Name).(api.NodeConditionStatus)
// TODO: distinguish other conditions like Reachable/Live, and begin storing this
// data on nodes directly via sync loops.
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
@ -96,3 +105,15 @@ func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node {
})
return node
}
// This is called to fill the cache.
func (r *HealthyRegistry) doCheck(key string) util.T {
switch status, err := r.client.HealthCheck(key); {
case err != nil:
return api.ConditionUnknown
case status == health.Unhealthy:
return api.ConditionNone
default:
return api.ConditionFull
}
}

View File

@ -19,10 +19,12 @@ package minion
import (
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type alwaysYes struct{}
@ -34,10 +36,12 @@ func (alwaysYes) HealthCheck(host string) (health.Status, error) {
func TestBasicDelegation(t *testing.T) {
ctx := api.NewContext()
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{})
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: alwaysYes{},
}
healthy := NewHealthyRegistry(
mockMinionRegistry,
alwaysYes{},
&util.FakeClock{},
60*time.Second,
)
list, err := healthy.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -82,10 +86,12 @@ func (n *notMinion) HealthCheck(host string) (health.Status, error) {
func TestFiltering(t *testing.T) {
ctx := api.NewContext()
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{})
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: &notMinion{minion: "m1"},
}
healthy := NewHealthyRegistry(
mockMinionRegistry,
&notMinion{minion: "m1"},
&util.FakeClock{},
60*time.Second,
)
expected := []string{"m1", "m2", "m3"}
list, err := healthy.ListMinions(ctx)
if err != nil {

View File

@ -18,11 +18,13 @@ package minion
import (
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestMinionRegistryREST(t *testing.T) {
@ -89,12 +91,14 @@ func TestMinionRegistryREST(t *testing.T) {
func TestMinionRegistryHealthCheck(t *testing.T) {
minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{})
minionHealthRegistry := HealthyRegistry{
delegate: minionRegistry,
client: &notMinion{minion: "m1"},
}
minionHealthRegistry := NewHealthyRegistry(
minionRegistry,
&notMinion{minion: "m1"},
&util.FakeClock{},
60*time.Second,
)
ms := NewREST(&minionHealthRegistry)
ms := NewREST(minionHealthRegistry)
ctx := api.NewContext()
c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "m1"}})

125
pkg/util/time_cache.go Normal file
View File

@ -0,0 +1,125 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"time"
)
// T stands in for any type in TimeCache
// Should make it easy to use this as a template for an autogenerator
// if we ever start doing that.
type T interface{}
type TimeCache interface {
// Get will fetch an item from the cache if
// it is present and recent enough.
Get(key string) T
}
type timeCacheEntry struct {
item T
lastUpdate time.Time
}
type timeCache struct {
clock Clock
fillFunc func(string) T
ttl time.Duration
inFlight map[string]chan T
inFlightLock sync.Mutex
cache map[string]timeCacheEntry
lock sync.RWMutex
}
// NewTimeCache returns a cache which calls fill to fill its entries, and
// forgets entries after ttl has passed.
func NewTimeCache(clock Clock, ttl time.Duration, fill func(key string) T) TimeCache {
return &timeCache{
clock: clock,
fillFunc: fill,
inFlight: map[string]chan T{},
cache: map[string]timeCacheEntry{},
ttl: ttl,
}
}
// Get returns the value of key from the cache, if it is present
// and recent enough; otherwise, it blocks while it gets the value.
func (c *timeCache) Get(key string) T {
if item, ok := c.get(key); ok {
return item
}
// We need to fill the cache. Calling the function could be
// expensive, so do it while unlocked.
wait := c.fillOrWait(key)
item := <-wait
// Put it back in the channel in case there's multiple waiters
// (this channel is non-blocking)
wait <- item
return item
}
// returns the item and true if it is found and not expired, otherwise nil and false.
func (c *timeCache) get(key string) (T, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
data, ok := c.cache[key]
now := c.clock.Now()
if !ok || now.Sub(data.lastUpdate) > c.ttl {
return nil, false
}
return data.item, true
}
func (c *timeCache) fillOrWait(key string) chan T {
c.inFlightLock.Lock()
defer c.inFlightLock.Unlock()
// Already a call in progress?
if current, ok := c.inFlight[key]; ok {
return current
}
// We are the first, so we have to make the call.
result := make(chan T, 1) // non-blocking
c.inFlight[key] = result
go func() {
// Make potentially slow call
data := timeCacheEntry{
item: c.fillFunc(key),
lastUpdate: c.clock.Now(),
}
result <- data.item
// Store in cache
c.lock.Lock()
c.cache[key] = data
c.lock.Unlock()
// Remove in flight entry
c.inFlightLock.Lock()
delete(c.inFlight, key)
c.inFlightLock.Unlock()
}()
return result
}

117
pkg/util/time_cache_test.go Normal file
View File

@ -0,0 +1,117 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"sync"
"testing"
"time"
fuzz "github.com/google/gofuzz"
)
func TestCacheExpire(t *testing.T) {
calls := map[string]int{}
ff := func(key string) T { calls[key]++; return key }
clock := &FakeClock{time.Now()}
c := NewTimeCache(clock, 60*time.Second, ff)
c.Get("foo")
c.Get("bar")
// This call should hit the cache, so we expect no additional calls
c.Get("foo")
// Advance the clock, this call should miss the cache, so expect one more call.
clock.Time = clock.Time.Add(61 * time.Second)
c.Get("foo")
c.Get("bar")
if e, a := 2, calls["foo"]; e != a {
t.Errorf("Wrong number of calls for foo: wanted %v, got %v", e, a)
}
if e, a := 2, calls["bar"]; e != a {
t.Errorf("Wrong number of calls for bar: wanted %v, got %v", e, a)
}
}
func TestCacheNotExpire(t *testing.T) {
calls := map[string]int{}
ff := func(key string) T { calls[key]++; return key }
clock := &FakeClock{time.Now()}
c := NewTimeCache(clock, 60*time.Second, ff)
c.Get("foo")
// This call should hit the cache, so we expect no additional calls to the cloud
clock.Time = clock.Time.Add(60 * time.Second)
c.Get("foo")
if e, a := 1, calls["foo"]; e != a {
t.Errorf("Wrong number of calls for foo: wanted %v, got %v", e, a)
}
}
func TestCacheParallel(t *testing.T) {
ff := func(key string) T { time.Sleep(time.Second); return key }
clock := &FakeClock{time.Now()}
c := NewTimeCache(clock, 60*time.Second, ff)
// Make some keys
keys := []string{}
fuzz.New().NilChance(0).NumElements(50, 50).Fuzz(&keys)
// If we have high parallelism, this will take only a second.
var wg sync.WaitGroup
wg.Add(len(keys))
for _, key := range keys {
go func(key string) {
c.Get(key)
wg.Done()
}(key)
}
wg.Wait()
}
func TestCacheParallelOneCall(t *testing.T) {
calls := 0
var callLock sync.Mutex
ff := func(key string) T {
time.Sleep(time.Second)
callLock.Lock()
defer callLock.Unlock()
calls++
return key
}
clock := &FakeClock{time.Now()}
c := NewTimeCache(clock, 60*time.Second, ff)
// If we have high parallelism, this will take only a second.
var wg sync.WaitGroup
wg.Add(50)
for i := 0; i < 50; i++ {
go func(key string) {
c.Get(key)
wg.Done()
}("aoeu")
}
wg.Wait()
// And if we wait for existing calls, we should have only one call.
if e, a := 1, calls; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}