From a7864aa230183d830b810ef3128b176d2f732ad9 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 30 Mar 2015 11:17:16 -0700 Subject: [PATCH] Scheduler uses TTLStore for assumed pods --- pkg/client/cache/expiration_cache.go | 189 +++++++++++++++++++ pkg/client/cache/expiration_cache_fakes.go | 52 ++++++ pkg/client/cache/expiration_cache_test.go | 133 ++++++++++++++ pkg/client/cache/store.go | 148 +++------------ pkg/client/cache/thread_safe_store.go | 201 +++++++++++++++++++++ pkg/client/request.go | 19 +- pkg/runtime/scheme.go | 3 +- pkg/util/clock.go | 11 ++ plugin/pkg/scheduler/modeler.go | 11 +- plugin/pkg/scheduler/scheduler_test.go | 150 +++++++++++++++ 10 files changed, 772 insertions(+), 145 deletions(-) create mode 100644 pkg/client/cache/expiration_cache.go create mode 100644 pkg/client/cache/expiration_cache_fakes.go create mode 100644 pkg/client/cache/expiration_cache_test.go create mode 100644 pkg/client/cache/thread_safe_store.go diff --git a/pkg/client/cache/expiration_cache.go b/pkg/client/cache/expiration_cache.go new file mode 100644 index 00000000000..dac736304a6 --- /dev/null +++ b/pkg/client/cache/expiration_cache.go @@ -0,0 +1,189 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "time" +) + +// ExpirationCache implements the store interface +// 1. All entries are automatically time stamped on insert +// a. The key is computed based off the original item/keyFunc +// b. The value inserted under that key is the timestamped item +// 2. Expiration happens lazily on read based on the expiration policy +// 3. Time-stamps are stripped off unexpired entries before return +type ExpirationCache struct { + cacheStorage ThreadSafeStore + keyFunc KeyFunc + clock util.Clock + expirationPolicy ExpirationPolicy +} + +// ExpirationPolicy dictates when an object expires. Currently only abstracted out +// so unittests don't rely on the system clock. +type ExpirationPolicy interface { + IsExpired(obj *timestampedEntry) bool +} + +// TTLPolicy implements a ttl based ExpirationPolicy. +type TTLPolicy struct { + // >0: Expire entries with an age > ttl + // <=0: Don't expire any entry + Ttl time.Duration + + // Clock used to calculate ttl expiration + Clock util.Clock +} + +// IsExpired returns true if the given object is older than the ttl, or it can't +// determine its age. +func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool { + return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl +} + +// timestampedEntry is the only type allowed in a ExpirationCache. +type timestampedEntry struct { + obj interface{} + timestamp time.Time +} + +// getTimestampedEntry returnes the timestampedEntry stored under the given key. +func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) { + item, _ := c.cacheStorage.Get(key) + // TODO: Check the cast instead + if tsEntry, ok := item.(*timestampedEntry); ok { + return tsEntry, true + } + return nil, false +} + +// getOrExpire retrieves the object from the timestampedEntry iff it hasn't +// already expired. It kicks-off a go routine to delete expired objects from +// the store and sets exists=false. +func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { + timestampedItem, exists := c.getTimestampedEntry(key) + if !exists { + return nil, false + } + if c.expirationPolicy.IsExpired(timestampedItem) { + // Since expiration happens lazily on read, don't hold up + // the reader trying to acquire a write lock for the delete. + // The next reader will retry the delete even if this one + // fails; as long as we only return un-expired entries a + // reader doesn't need to wait for the result of the delete. + go func() { + defer util.HandleCrash() + c.cacheStorage.Delete(key) + }() + return nil, false + } + return timestampedItem.obj, true +} + +// GetByKey returns the item stored under the key, or sets exists=false. +func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) { + obj, exists := c.getOrExpire(key) + return obj, exists, nil +} + +// Get returns unexpired items. It purges the cache of expired items in the +// process. +func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) { + key, err := c.keyFunc(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + obj, exists := c.getOrExpire(key) + return obj, exists, nil +} + +// List retrieves a list of unexpired items. It purges the cache of expired +// items in the process. +func (c *ExpirationCache) List() []interface{} { + items := c.cacheStorage.List() + + list := make([]interface{}, 0, len(items)) + for _, item := range items { + obj := item.(*timestampedEntry).obj + if key, err := c.keyFunc(obj); err != nil { + list = append(list, obj) + } else if obj, exists := c.getOrExpire(key); exists { + list = append(list, obj) + } + } + return list +} + +// ListKeys returns a list of all keys in the expiration cache. +func (c *ExpirationCache) ListKeys() []string { + return c.cacheStorage.ListKeys() +} + +// Add timestamps an item and inserts it into the cache, overwriting entries +// that might exist under the same key. +func (c *ExpirationCache) Add(obj interface{}) error { + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Add(key, ×tampedEntry{obj, c.clock.Now()}) + return nil +} + +// Update has not been implemented yet for lack of a use case, so this method +// simply calls `Add`. This effectively refreshes the timestamp. +func (c *ExpirationCache) Update(obj interface{}) error { + return c.Add(obj) +} + +// Delete removes an item from the cache. +func (c *ExpirationCache) Delete(obj interface{}) error { + key, err := c.keyFunc(obj) + if err != nil { + return KeyError{obj, err} + } + c.cacheStorage.Delete(key) + return nil +} + +// Replace will convert all items in the given list to TimestampedEntries +// before attempting the replace operation. The replace operation will +// delete the contents of the ExpirationCache `c`. +func (c *ExpirationCache) Replace(list []interface{}) error { + items := map[string]interface{}{} + ts := c.clock.Now() + for _, item := range list { + key, err := c.keyFunc(item) + if err != nil { + return KeyError{item, err} + } + items[key] = ×tampedEntry{item, ts} + } + c.cacheStorage.Replace(items) + return nil +} + +// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy +func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store { + return &ExpirationCache{ + cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), + keyFunc: keyFunc, + clock: util.RealClock{}, + expirationPolicy: &TTLPolicy{ttl, util.RealClock{}}, + } +} diff --git a/pkg/client/cache/expiration_cache_fakes.go b/pkg/client/cache/expiration_cache_fakes.go new file mode 100644 index 00000000000..9df663e30d4 --- /dev/null +++ b/pkg/client/cache/expiration_cache_fakes.go @@ -0,0 +1,52 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type fakeThreadSafeMap struct { + ThreadSafeStore + deletedKeys chan<- string +} + +func (c *fakeThreadSafeMap) Delete(key string) { + if c.deletedKeys != nil { + c.deletedKeys <- key + } +} + +type FakeExpirationPolicy struct { + NeverExpire util.StringSet + RetrieveKeyFunc KeyFunc +} + +func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool { + key, _ := p.RetrieveKeyFunc(obj) + return !p.NeverExpire.Has(key) +} + +func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock util.Clock) Store { + cacheStorage := NewThreadSafeStore(Indexers{}, Indices{}) + return &ExpirationCache{ + cacheStorage: &fakeThreadSafeMap{cacheStorage, deletedKeys}, + keyFunc: keyFunc, + clock: cacheClock, + expirationPolicy: expirationPolicy, + } +} diff --git a/pkg/client/cache/expiration_cache_test.go b/pkg/client/cache/expiration_cache_test.go new file mode 100644 index 00000000000..7c66d0180a0 --- /dev/null +++ b/pkg/client/cache/expiration_cache_test.go @@ -0,0 +1,133 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "reflect" + "testing" + "time" +) + +func TestTTLExpirationBasic(t *testing.T) { + testObj := testStoreObject{id: "foo", val: "bar"} + deleteChan := make(chan string) + ttlStore := NewFakeExpirationStore( + testStoreKeyFunc, deleteChan, + &FakeExpirationPolicy{ + NeverExpire: util.NewStringSet(), + RetrieveKeyFunc: func(obj interface{}) (string, error) { + return obj.(*timestampedEntry).obj.(testStoreObject).id, nil + }, + }, + util.RealClock{}, + ) + err := ttlStore.Add(testObj) + if err != nil { + t.Errorf("Unable to add obj %#v", testObj) + } + item, exists, err := ttlStore.Get(testObj) + if err != nil { + t.Errorf("Failed to get from store, %v", err) + } + if exists || item != nil { + t.Errorf("Got unexpected item %#v", item) + } + key, _ := testStoreKeyFunc(testObj) + select { + case delKey := <-deleteChan: + if delKey != key { + t.Errorf("Unexpected delete for key %s", key) + } + case <-time.After(time.Millisecond * 100): + t.Errorf("Unexpected timeout waiting on delete") + } + close(deleteChan) +} + +func TestTTLList(t *testing.T) { + testObjs := []testStoreObject{ + {id: "foo", val: "bar"}, + {id: "foo1", val: "bar1"}, + {id: "foo2", val: "bar2"}, + } + expireKeys := util.NewStringSet(testObjs[0].id, testObjs[2].id) + deleteChan := make(chan string) + defer close(deleteChan) + + ttlStore := NewFakeExpirationStore( + testStoreKeyFunc, deleteChan, + &FakeExpirationPolicy{ + NeverExpire: util.NewStringSet(testObjs[1].id), + RetrieveKeyFunc: func(obj interface{}) (string, error) { + return obj.(*timestampedEntry).obj.(testStoreObject).id, nil + }, + }, + util.RealClock{}, + ) + for _, obj := range testObjs { + err := ttlStore.Add(obj) + if err != nil { + t.Errorf("Unable to add obj %#v", obj) + } + } + listObjs := ttlStore.List() + if len(listObjs) != 1 || !reflect.DeepEqual(listObjs[0], testObjs[1]) { + t.Errorf("List returned unexpected results %#v", listObjs) + } + + // Make sure all our deletes come through in an acceptable rate (1/100ms) + for expireKeys.Len() != 0 { + select { + case delKey := <-deleteChan: + if !expireKeys.Has(delKey) { + t.Errorf("Unexpected delete for key %s", delKey) + } + expireKeys.Delete(delKey) + case <-time.After(time.Millisecond * 100): + t.Errorf("Unexpected timeout waiting on delete") + return + } + } +} + +func TestTTLPolicy(t *testing.T) { + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + ttl := 30 * time.Second + exactlyOnTTL := fakeTime.Add(-ttl) + expiredTime := fakeTime.Add(-(ttl + 1)) + + policy := TTLPolicy{ttl, &util.FakeClock{fakeTime}} + fakeTimestampedEntry := ×tampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL} + if policy.IsExpired(fakeTimestampedEntry) { + t.Errorf("TTL cache should not expire entries exactly on ttl") + } + fakeTimestampedEntry.timestamp = fakeTime + if policy.IsExpired(fakeTimestampedEntry) { + t.Errorf("TTL Cache should not expire entires before ttl") + } + fakeTimestampedEntry.timestamp = expiredTime + if !policy.IsExpired(fakeTimestampedEntry) { + t.Errorf("TTL Cache should expire entries older than ttl") + } + for _, ttl = range []time.Duration{0, -1} { + policy.Ttl = ttl + if policy.IsExpired(fakeTimestampedEntry) { + t.Errorf("TTL policy should only expire entries when initialized with a ttl > 0") + } + } +} diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 93d71035967..f42d6e178e1 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -18,10 +18,7 @@ package cache import ( "fmt" - "sync" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // Store is a generic object storage interface. Reflector knows how to watch a server @@ -77,16 +74,15 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) { return meta.Name(), nil } +// cache responsibilities are limited to: +// 1. Computing keys for objects via keyFunc +// 2. Invoking methods of a ThreadSafeStorage interface type cache struct { - lock sync.RWMutex - items map[string]interface{} + // cacheStorage bears the burden of thread safety for the cache + cacheStorage ThreadSafeStore // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc - // indexers maps a name to an IndexFunc - indexers Indexers - // indices maps a name to an Index - indices Indices } // Add inserts an item into the cache. @@ -95,66 +91,7 @@ func (c *cache) Add(obj interface{}) error { if err != nil { return KeyError{obj, err} } - // keep a pointer to whatever could have been there previously - c.lock.Lock() - defer c.lock.Unlock() - oldObject := c.items[key] - c.items[key] = obj - c.updateIndices(oldObject, obj) - return nil -} - -// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj -// updateIndices must be called from a function that already has a lock on the cache -func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error { - // if we got an old object, we need to remove it before we add it again - if oldObj != nil { - c.deleteFromIndices(oldObj) - } - key, err := c.keyFunc(newObj) - if err != nil { - return err - } - for name, indexFunc := range c.indexers { - indexValue, err := indexFunc(newObj) - if err != nil { - return err - } - index := c.indices[name] - if index == nil { - index = Index{} - c.indices[name] = index - } - set := index[indexValue] - if set == nil { - set = util.StringSet{} - index[indexValue] = set - } - set.Insert(key) - } - return nil -} - -// deleteFromIndices removes the object from each of the managed indexes -// it is intended to be called from a function that already has a lock on the cache -func (c *cache) deleteFromIndices(obj interface{}) error { - key, err := c.keyFunc(obj) - if err != nil { - return err - } - for name, indexFunc := range c.indexers { - indexValue, err := indexFunc(obj) - if err != nil { - return err - } - index := c.indices[name] - if index != nil { - set := index[indexValue] - if set != nil { - set.Delete(key) - } - } - } + c.cacheStorage.Add(key, obj) return nil } @@ -164,11 +101,7 @@ func (c *cache) Update(obj interface{}) error { if err != nil { return KeyError{obj, err} } - c.lock.Lock() - defer c.lock.Unlock() - oldObject := c.items[key] - c.items[key] = obj - c.updateIndices(oldObject, obj) + c.cacheStorage.Update(key, obj) return nil } @@ -178,59 +111,26 @@ func (c *cache) Delete(obj interface{}) error { if err != nil { return KeyError{obj, err} } - c.lock.Lock() - defer c.lock.Unlock() - delete(c.items, key) - c.deleteFromIndices(obj) + c.cacheStorage.Delete(key) return nil } // List returns a list of all the items. // List is completely threadsafe as long as you treat all items as immutable. func (c *cache) List() []interface{} { - c.lock.RLock() - defer c.lock.RUnlock() - list := make([]interface{}, 0, len(c.items)) - for _, item := range c.items { - list = append(list, item) - } - return list + return c.cacheStorage.List() } // ListKeys returns a list of all the keys of the objects currently // in the cache. func (c *cache) ListKeys() []string { - c.lock.RLock() - defer c.lock.RUnlock() - list := make([]string, 0, len(c.items)) - for key := range c.items { - list = append(list, key) - } - return list + return c.cacheStorage.ListKeys() } // Index returns a list of items that match on the index function // Index is thread-safe so long as you treat all items as immutable func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) { - c.lock.RLock() - defer c.lock.RUnlock() - - indexFunc := c.indexers[indexName] - if indexFunc == nil { - return nil, fmt.Errorf("Index with name %s does not exist", indexName) - } - - indexKey, err := indexFunc(obj) - if err != nil { - return nil, err - } - index := c.indices[indexName] - set := index[indexKey] - list := make([]interface{}, 0, set.Len()) - for _, key := range set.List() { - list = append(list, c.items[key]) - } - return list, nil + return c.cacheStorage.Index(indexName, obj) } // Get returns the requested item, or sets exists=false. @@ -246,9 +146,7 @@ func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) // GetByKey returns the request item, or exists=false. // GetByKey is completely threadsafe as long as you treat all items as immutable. func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) { - c.lock.RLock() - defer c.lock.RUnlock() - item, exists = c.items[key] + item, exists = c.cacheStorage.Get(key) return item, exists, nil } @@ -264,26 +162,22 @@ func (c *cache) Replace(list []interface{}) error { } items[key] = item } - - c.lock.Lock() - defer c.lock.Unlock() - c.items = items - - // rebuild any index - c.indices = Indices{} - for _, item := range c.items { - c.updateIndices(nil, item) - } - + c.cacheStorage.Replace(items) return nil } // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { - return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}} + return &cache{ + cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), + keyFunc: keyFunc, + } } // NewIndexer returns an Indexer implemented simply with a map and a lock. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { - return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}} + return &cache{ + cacheStorage: NewThreadSafeStore(indexers, Indices{}), + keyFunc: keyFunc, + } } diff --git a/pkg/client/cache/thread_safe_store.go b/pkg/client/cache/thread_safe_store.go new file mode 100644 index 00000000000..09671d28992 --- /dev/null +++ b/pkg/client/cache/thread_safe_store.go @@ -0,0 +1,201 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// ThreadSafeStore is an interface that allows concurrent access to a storage backend. +// TL;DR caveats: you must not modify anything returned by Get or List as it will break +// the indexing feature in addition to not being thread safe. +// +// The guarantees of thread safety provided by List/Get are only valid if the caller +// treats returned items as read-only. For example, a pointer inserted in the store +// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` +// on the same key and modify the pointer in a non-thread-safe way. Also note that +// modifying objects stored by the indexers (if any) will *not* automatically lead +// to a re-index. So it's not a good idea to directly modify the objects returned by +// Get/List, in general. +type ThreadSafeStore interface { + Add(key string, obj interface{}) + Update(key string, obj interface{}) + Delete(key string) + Get(key string) (item interface{}, exists bool) + List() []interface{} + ListKeys() []string + Replace(map[string]interface{}) + Index(indexName string, obj interface{}) ([]interface{}, error) +} + +// threadSafeMap implements ThreadSafeStore +type threadSafeMap struct { + lock sync.RWMutex + items map[string]interface{} + + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices +} + +func (c *threadSafeMap) Add(key string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + oldObject := c.items[key] + c.items[key] = obj + c.updateIndices(oldObject, obj, key) +} + +func (c *threadSafeMap) Update(key string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + oldObject := c.items[key] + c.items[key] = obj + c.updateIndices(oldObject, obj, key) +} + +func (c *threadSafeMap) Delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + if obj, exists := c.items[key]; exists { + c.deleteFromIndices(obj, key) + delete(c.items, key) + } +} + +func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, exists = c.items[key] + return item, exists +} + +func (c *threadSafeMap) List() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]interface{}, 0, len(c.items)) + for _, item := range c.items { + list = append(list, item) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the threadSafeMap. +func (c *threadSafeMap) ListKeys() []string { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]string, 0, len(c.items)) + for key := range c.items { + list = append(list, key) + } + return list +} + +func (c *threadSafeMap) Replace(items map[string]interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.items = items + + // rebuild any index + c.indices = Indices{} + for key, item := range c.items { + c.updateIndices(nil, item, key) + } +} + +// Index returns a list of items that match on the index function +// Index is thread-safe so long as you treat all items as immutable +func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexFunc := c.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexKey, err := indexFunc(obj) + if err != nil { + return nil, err + } + index := c.indices[indexName] + set := index[indexKey] + list := make([]interface{}, 0, set.Len()) + for _, key := range set.List() { + list = append(list, c.items[key]) + } + return list, nil +} + +// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj +// updateIndices must be called from a function that already has a lock on the cache +func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error { + // if we got an old object, we need to remove it before we add it again + if oldObj != nil { + c.deleteFromIndices(oldObj, key) + } + for name, indexFunc := range c.indexers { + indexValue, err := indexFunc(newObj) + if err != nil { + return err + } + index := c.indices[name] + if index == nil { + index = Index{} + c.indices[name] = index + } + set := index[indexValue] + if set == nil { + set = util.StringSet{} + index[indexValue] = set + } + set.Insert(key) + } + return nil +} + +// deleteFromIndices removes the object from each of the managed indexes +// it is intended to be called from a function that already has a lock on the cache +func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error { + for name, indexFunc := range c.indexers { + indexValue, err := indexFunc(obj) + if err != nil { + return err + } + index := c.indices[name] + if index != nil { + set := index[indexValue] + if set != nil { + set.Delete(key) + } + } + } + return nil +} + +func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { + return &threadSafeMap{ + items: map[string]interface{}{}, + indexers: indexers, + indices: Indices{}, + } +} diff --git a/pkg/client/request.go b/pkg/client/request.go index 7f52805433a..24d904bcd7b 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -20,16 +20,6 @@ import ( "bytes" "crypto/tls" "fmt" - "io" - "io/ioutil" - "mime" - "net/http" - "net/url" - "path" - "strconv" - "strings" - "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/metrics" @@ -41,6 +31,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" "github.com/golang/glog" + "io" + "io/ioutil" + "mime" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" ) // specialParams lists parameters that are handled specially and which users of Request diff --git a/pkg/runtime/scheme.go b/pkg/runtime/scheme.go index bbbd38889e7..0e0ed67070b 100644 --- a/pkg/runtime/scheme.go +++ b/pkg/runtime/scheme.go @@ -18,10 +18,9 @@ package runtime import ( "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "net/url" "reflect" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" ) // Scheme defines methods for serializing and deserializing API objects. It diff --git a/pkg/util/clock.go b/pkg/util/clock.go index 0ef99f99daf..f391b81c5d8 100644 --- a/pkg/util/clock.go +++ b/pkg/util/clock.go @@ -24,6 +24,7 @@ import ( // needs to do arbitrary things based on time. type Clock interface { Now() time.Time + Since(time.Time) time.Duration } // RealClock really calls time.Now() @@ -34,6 +35,11 @@ func (r RealClock) Now() time.Time { return time.Now() } +// Since returns time since the specified timestamp. +func (r RealClock) Since(ts time.Time) time.Duration { + return time.Since(ts) +} + // FakeClock implements Clock, but returns an arbitary time. type FakeClock struct { Time time.Time @@ -43,3 +49,8 @@ type FakeClock struct { func (f *FakeClock) Now() time.Time { return f.Time } + +// Since returns time since the time in f. +func (f *FakeClock) Since(ts time.Time) time.Duration { + return f.Time.Sub(ts) +} diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go index 7833b5b94b6..0e7839b579c 100644 --- a/plugin/pkg/scheduler/modeler.go +++ b/plugin/pkg/scheduler/modeler.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" @@ -95,7 +96,9 @@ func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModele return &SimpleModeler{ queuedPods: queuedPods, scheduledPods: scheduledPods, - assumedPods: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + assumedPods: &cache.StoreToPodLister{ + cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second), + }, } } @@ -124,10 +127,6 @@ func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err // Since the assumed list will be short, just check every one. // Goal here is to stop making assumptions about a pod once it shows // up in one of these other lists. - // TODO: there's a possibility that a pod could get deleted at the - // exact wrong time and linger in assumedPods forever. So we - // need go through that periodically and check for deleted - // pods. for _, pod := range assumed { qExist, err := s.queuedPods.Exists(&pod) if err != nil { @@ -151,7 +150,7 @@ func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err if err != nil { return nil, err } - // re-get in case we deleted any. + // Listing purges the ttl cache and re-gets, in case we deleted any entries. assumed, err = s.assumedPods.List(selector) if err != nil { return nil, err diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 46355598626..41b8c0cf040 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -18,11 +18,14 @@ package scheduler import ( "errors" + "math/rand" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -43,6 +46,14 @@ func podWithID(id, desiredHost string) *api.Pod { } } +func podWithPort(id, desiredHost string, port int) *api.Pod { + pod := podWithID(id, desiredHost) + pod.Spec.Containers = []api.Container{ + {Name: "ctr", Ports: []api.ContainerPort{{HostPort: port}}}, + } + return pod +} + type mockScheduler struct { machine string err error @@ -144,3 +155,142 @@ func TestScheduler(t *testing.T) { events.Stop() } } + +func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { + eventBroadcaster := record.NewBroadcaster() + defer eventBroadcaster.StartLogging(t.Logf).Stop() + + // Setup modeler so we control the contents of all 3 stores: assumed, + // scheduled and queued + scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + scheduledPodLister := &cache.StoreToPodLister{scheduledPodStore} + + queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + queuedPodLister := &cache.StoreToPodLister{queuedPodStore} + + modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister) + + // Create a fake clock used to timestamp entries and calculate ttl. Nothing + // will expire till we flip to something older than the ttl, at which point + // all entries inserted with fakeTime will expire. + ttl := 30 * time.Second + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := &util.FakeClock{fakeTime} + ttlPolicy := &cache.TTLPolicy{ttl, fakeClock} + assumedPodsStore := cache.NewFakeExpirationStore( + cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock) + modeler.assumedPods = &cache.StoreToPodLister{assumedPodsStore} + + // Port is the easiest way to cause a fit predicate failure + podPort := 8080 + firstPod := podWithPort("foo", "", podPort) + + // Create the scheduler config + algo := scheduler.NewGenericScheduler( + map[string]scheduler.FitPredicate{"PodFitsPorts": scheduler.PodFitsPorts}, + []scheduler.PriorityConfig{}, + modeler.PodLister(), + rand.New(rand.NewSource(time.Now().UnixNano()))) + + var gotBinding *api.Binding + c := &Config{ + Modeler: modeler, + MinionLister: scheduler.FakeMinionLister( + api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, + ), + Algorithm: algo, + Binder: fakeBinder{func(b *api.Binding) error { + scheduledPodStore.Add(podWithPort(b.Name, b.Target.Name, podPort)) + gotBinding = b + return nil + }}, + NextPod: func() *api.Pod { + return queuedPodStore.Pop().(*api.Pod) + }, + Error: func(p *api.Pod, err error) { + t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err) + }, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), + } + + // First scheduling pass should schedule the pod + s := New(c) + called := make(chan struct{}) + events := eventBroadcaster.StartEventWatcher(func(e *api.Event) { + if e, a := "scheduled", e.Reason; e != a { + t.Errorf("expected %v, got %v", e, a) + } + close(called) + }) + + queuedPodStore.Add(firstPod) + // queuedPodStore: [foo:8080] + // scheduledPodStore: [] + // assumedPods: [] + + s.scheduleOne() + // queuedPodStore: [] + // scheduledPodStore: [foo:8080] + // assumedPods: [foo:8080] + + pod, exists, _ := scheduledPodStore.GetByKey("foo") + if !exists { + t.Errorf("Expected scheduled pod store to contain pod") + } + pod, exists, _ = queuedPodStore.GetByKey("foo") + if exists { + t.Errorf("Did not expect a queued pod, found %+v", pod) + } + pod, exists, _ = assumedPodsStore.GetByKey("foo") + if !exists { + t.Errorf("Assumed pod store should contain stale pod") + } + + expectBind := &api.Binding{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, + } + if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) { + t.Errorf("Expected exact match on binding: %s", util.ObjectDiff(ex, ac)) + } + + <-called + events.Stop() + + scheduledPodStore.Delete(pod) + _, exists, _ = assumedPodsStore.Get(pod) + if !exists { + t.Errorf("Expected pod %#v in assumed pod store", pod) + } + + secondPod := podWithPort("bar", "", podPort) + queuedPodStore.Add(secondPod) + // queuedPodStore: [bar:8080] + // scheduledPodStore: [] + // assumedPods: [foo:8080] + + // Second scheduling pass will fail to schedule if the store hasn't expired + // the deleted pod. This would normally happen with a timeout. + //expirationPolicy.NeverExpire = util.NewStringSet() + fakeClock.Time = fakeClock.Time.Add(ttl + 1) + + called = make(chan struct{}) + events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { + if e, a := "scheduled", e.Reason; e != a { + t.Errorf("expected %v, got %v", e, a) + } + close(called) + }) + + s.scheduleOne() + + expectBind = &api.Binding{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, + } + if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) { + t.Errorf("Expected exact match on binding: %s", util.ObjectDiff(ex, ac)) + } + <-called + events.Stop() +}