mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
Merge pull request #6179 from bprashanth/ts_ttl_scheduler
Modeler uses a ttl store for assumed pods
This commit is contained in:
commit
43ec88fda5
189
pkg/client/cache/expiration_cache.go
vendored
Normal file
189
pkg/client/cache/expiration_cache.go
vendored
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExpirationCache implements the store interface
|
||||||
|
// 1. All entries are automatically time stamped on insert
|
||||||
|
// a. The key is computed based off the original item/keyFunc
|
||||||
|
// b. The value inserted under that key is the timestamped item
|
||||||
|
// 2. Expiration happens lazily on read based on the expiration policy
|
||||||
|
// 3. Time-stamps are stripped off unexpired entries before return
|
||||||
|
type ExpirationCache struct {
|
||||||
|
cacheStorage ThreadSafeStore
|
||||||
|
keyFunc KeyFunc
|
||||||
|
clock util.Clock
|
||||||
|
expirationPolicy ExpirationPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExpirationPolicy dictates when an object expires. Currently only abstracted out
|
||||||
|
// so unittests don't rely on the system clock.
|
||||||
|
type ExpirationPolicy interface {
|
||||||
|
IsExpired(obj *timestampedEntry) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// TTLPolicy implements a ttl based ExpirationPolicy.
|
||||||
|
type TTLPolicy struct {
|
||||||
|
// >0: Expire entries with an age > ttl
|
||||||
|
// <=0: Don't expire any entry
|
||||||
|
Ttl time.Duration
|
||||||
|
|
||||||
|
// Clock used to calculate ttl expiration
|
||||||
|
Clock util.Clock
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsExpired returns true if the given object is older than the ttl, or it can't
|
||||||
|
// determine its age.
|
||||||
|
func (p *TTLPolicy) IsExpired(obj *timestampedEntry) bool {
|
||||||
|
return p.Ttl > 0 && p.Clock.Since(obj.timestamp) > p.Ttl
|
||||||
|
}
|
||||||
|
|
||||||
|
// timestampedEntry is the only type allowed in a ExpirationCache.
|
||||||
|
type timestampedEntry struct {
|
||||||
|
obj interface{}
|
||||||
|
timestamp time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTimestampedEntry returnes the timestampedEntry stored under the given key.
|
||||||
|
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
|
||||||
|
item, _ := c.cacheStorage.Get(key)
|
||||||
|
// TODO: Check the cast instead
|
||||||
|
if tsEntry, ok := item.(*timestampedEntry); ok {
|
||||||
|
return tsEntry, true
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOrExpire retrieves the object from the timestampedEntry iff it hasn't
|
||||||
|
// already expired. It kicks-off a go routine to delete expired objects from
|
||||||
|
// the store and sets exists=false.
|
||||||
|
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
|
||||||
|
timestampedItem, exists := c.getTimestampedEntry(key)
|
||||||
|
if !exists {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if c.expirationPolicy.IsExpired(timestampedItem) {
|
||||||
|
// Since expiration happens lazily on read, don't hold up
|
||||||
|
// the reader trying to acquire a write lock for the delete.
|
||||||
|
// The next reader will retry the delete even if this one
|
||||||
|
// fails; as long as we only return un-expired entries a
|
||||||
|
// reader doesn't need to wait for the result of the delete.
|
||||||
|
go func() {
|
||||||
|
defer util.HandleCrash()
|
||||||
|
c.cacheStorage.Delete(key)
|
||||||
|
}()
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return timestampedItem.obj, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetByKey returns the item stored under the key, or sets exists=false.
|
||||||
|
func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error) {
|
||||||
|
obj, exists := c.getOrExpire(key)
|
||||||
|
return obj, exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns unexpired items. It purges the cache of expired items in the
|
||||||
|
// process.
|
||||||
|
func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error) {
|
||||||
|
key, err := c.keyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, KeyError{obj, err}
|
||||||
|
}
|
||||||
|
obj, exists := c.getOrExpire(key)
|
||||||
|
return obj, exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List retrieves a list of unexpired items. It purges the cache of expired
|
||||||
|
// items in the process.
|
||||||
|
func (c *ExpirationCache) List() []interface{} {
|
||||||
|
items := c.cacheStorage.List()
|
||||||
|
|
||||||
|
list := make([]interface{}, 0, len(items))
|
||||||
|
for _, item := range items {
|
||||||
|
obj := item.(*timestampedEntry).obj
|
||||||
|
if key, err := c.keyFunc(obj); err != nil {
|
||||||
|
list = append(list, obj)
|
||||||
|
} else if obj, exists := c.getOrExpire(key); exists {
|
||||||
|
list = append(list, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListKeys returns a list of all keys in the expiration cache.
|
||||||
|
func (c *ExpirationCache) ListKeys() []string {
|
||||||
|
return c.cacheStorage.ListKeys()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add timestamps an item and inserts it into the cache, overwriting entries
|
||||||
|
// that might exist under the same key.
|
||||||
|
func (c *ExpirationCache) Add(obj interface{}) error {
|
||||||
|
key, err := c.keyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return KeyError{obj, err}
|
||||||
|
}
|
||||||
|
c.cacheStorage.Add(key, ×tampedEntry{obj, c.clock.Now()})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update has not been implemented yet for lack of a use case, so this method
|
||||||
|
// simply calls `Add`. This effectively refreshes the timestamp.
|
||||||
|
func (c *ExpirationCache) Update(obj interface{}) error {
|
||||||
|
return c.Add(obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes an item from the cache.
|
||||||
|
func (c *ExpirationCache) Delete(obj interface{}) error {
|
||||||
|
key, err := c.keyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return KeyError{obj, err}
|
||||||
|
}
|
||||||
|
c.cacheStorage.Delete(key)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace will convert all items in the given list to TimestampedEntries
|
||||||
|
// before attempting the replace operation. The replace operation will
|
||||||
|
// delete the contents of the ExpirationCache `c`.
|
||||||
|
func (c *ExpirationCache) Replace(list []interface{}) error {
|
||||||
|
items := map[string]interface{}{}
|
||||||
|
ts := c.clock.Now()
|
||||||
|
for _, item := range list {
|
||||||
|
key, err := c.keyFunc(item)
|
||||||
|
if err != nil {
|
||||||
|
return KeyError{item, err}
|
||||||
|
}
|
||||||
|
items[key] = ×tampedEntry{item, ts}
|
||||||
|
}
|
||||||
|
c.cacheStorage.Replace(items)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy
|
||||||
|
func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store {
|
||||||
|
return &ExpirationCache{
|
||||||
|
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
|
||||||
|
keyFunc: keyFunc,
|
||||||
|
clock: util.RealClock{},
|
||||||
|
expirationPolicy: &TTLPolicy{ttl, util.RealClock{}},
|
||||||
|
}
|
||||||
|
}
|
52
pkg/client/cache/expiration_cache_fakes.go
vendored
Normal file
52
pkg/client/cache/expiration_cache_fakes.go
vendored
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeThreadSafeMap struct {
|
||||||
|
ThreadSafeStore
|
||||||
|
deletedKeys chan<- string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *fakeThreadSafeMap) Delete(key string) {
|
||||||
|
if c.deletedKeys != nil {
|
||||||
|
c.deletedKeys <- key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakeExpirationPolicy struct {
|
||||||
|
NeverExpire util.StringSet
|
||||||
|
RetrieveKeyFunc KeyFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *FakeExpirationPolicy) IsExpired(obj *timestampedEntry) bool {
|
||||||
|
key, _ := p.RetrieveKeyFunc(obj)
|
||||||
|
return !p.NeverExpire.Has(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock util.Clock) Store {
|
||||||
|
cacheStorage := NewThreadSafeStore(Indexers{}, Indices{})
|
||||||
|
return &ExpirationCache{
|
||||||
|
cacheStorage: &fakeThreadSafeMap{cacheStorage, deletedKeys},
|
||||||
|
keyFunc: keyFunc,
|
||||||
|
clock: cacheClock,
|
||||||
|
expirationPolicy: expirationPolicy,
|
||||||
|
}
|
||||||
|
}
|
133
pkg/client/cache/expiration_cache_test.go
vendored
Normal file
133
pkg/client/cache/expiration_cache_test.go
vendored
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTTLExpirationBasic(t *testing.T) {
|
||||||
|
testObj := testStoreObject{id: "foo", val: "bar"}
|
||||||
|
deleteChan := make(chan string)
|
||||||
|
ttlStore := NewFakeExpirationStore(
|
||||||
|
testStoreKeyFunc, deleteChan,
|
||||||
|
&FakeExpirationPolicy{
|
||||||
|
NeverExpire: util.NewStringSet(),
|
||||||
|
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
||||||
|
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
util.RealClock{},
|
||||||
|
)
|
||||||
|
err := ttlStore.Add(testObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unable to add obj %#v", testObj)
|
||||||
|
}
|
||||||
|
item, exists, err := ttlStore.Get(testObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to get from store, %v", err)
|
||||||
|
}
|
||||||
|
if exists || item != nil {
|
||||||
|
t.Errorf("Got unexpected item %#v", item)
|
||||||
|
}
|
||||||
|
key, _ := testStoreKeyFunc(testObj)
|
||||||
|
select {
|
||||||
|
case delKey := <-deleteChan:
|
||||||
|
if delKey != key {
|
||||||
|
t.Errorf("Unexpected delete for key %s", key)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
t.Errorf("Unexpected timeout waiting on delete")
|
||||||
|
}
|
||||||
|
close(deleteChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTTLList(t *testing.T) {
|
||||||
|
testObjs := []testStoreObject{
|
||||||
|
{id: "foo", val: "bar"},
|
||||||
|
{id: "foo1", val: "bar1"},
|
||||||
|
{id: "foo2", val: "bar2"},
|
||||||
|
}
|
||||||
|
expireKeys := util.NewStringSet(testObjs[0].id, testObjs[2].id)
|
||||||
|
deleteChan := make(chan string)
|
||||||
|
defer close(deleteChan)
|
||||||
|
|
||||||
|
ttlStore := NewFakeExpirationStore(
|
||||||
|
testStoreKeyFunc, deleteChan,
|
||||||
|
&FakeExpirationPolicy{
|
||||||
|
NeverExpire: util.NewStringSet(testObjs[1].id),
|
||||||
|
RetrieveKeyFunc: func(obj interface{}) (string, error) {
|
||||||
|
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
util.RealClock{},
|
||||||
|
)
|
||||||
|
for _, obj := range testObjs {
|
||||||
|
err := ttlStore.Add(obj)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unable to add obj %#v", obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
listObjs := ttlStore.List()
|
||||||
|
if len(listObjs) != 1 || !reflect.DeepEqual(listObjs[0], testObjs[1]) {
|
||||||
|
t.Errorf("List returned unexpected results %#v", listObjs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure all our deletes come through in an acceptable rate (1/100ms)
|
||||||
|
for expireKeys.Len() != 0 {
|
||||||
|
select {
|
||||||
|
case delKey := <-deleteChan:
|
||||||
|
if !expireKeys.Has(delKey) {
|
||||||
|
t.Errorf("Unexpected delete for key %s", delKey)
|
||||||
|
}
|
||||||
|
expireKeys.Delete(delKey)
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
t.Errorf("Unexpected timeout waiting on delete")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTTLPolicy(t *testing.T) {
|
||||||
|
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
|
||||||
|
ttl := 30 * time.Second
|
||||||
|
exactlyOnTTL := fakeTime.Add(-ttl)
|
||||||
|
expiredTime := fakeTime.Add(-(ttl + 1))
|
||||||
|
|
||||||
|
policy := TTLPolicy{ttl, &util.FakeClock{fakeTime}}
|
||||||
|
fakeTimestampedEntry := ×tampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL}
|
||||||
|
if policy.IsExpired(fakeTimestampedEntry) {
|
||||||
|
t.Errorf("TTL cache should not expire entries exactly on ttl")
|
||||||
|
}
|
||||||
|
fakeTimestampedEntry.timestamp = fakeTime
|
||||||
|
if policy.IsExpired(fakeTimestampedEntry) {
|
||||||
|
t.Errorf("TTL Cache should not expire entires before ttl")
|
||||||
|
}
|
||||||
|
fakeTimestampedEntry.timestamp = expiredTime
|
||||||
|
if !policy.IsExpired(fakeTimestampedEntry) {
|
||||||
|
t.Errorf("TTL Cache should expire entries older than ttl")
|
||||||
|
}
|
||||||
|
for _, ttl = range []time.Duration{0, -1} {
|
||||||
|
policy.Ttl = ttl
|
||||||
|
if policy.IsExpired(fakeTimestampedEntry) {
|
||||||
|
t.Errorf("TTL policy should only expire entries when initialized with a ttl > 0")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
148
pkg/client/cache/store.go
vendored
148
pkg/client/cache/store.go
vendored
@ -18,10 +18,7 @@ package cache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store is a generic object storage interface. Reflector knows how to watch a server
|
// Store is a generic object storage interface. Reflector knows how to watch a server
|
||||||
@ -77,16 +74,15 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
|||||||
return meta.Name(), nil
|
return meta.Name(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cache responsibilities are limited to:
|
||||||
|
// 1. Computing keys for objects via keyFunc
|
||||||
|
// 2. Invoking methods of a ThreadSafeStorage interface
|
||||||
type cache struct {
|
type cache struct {
|
||||||
lock sync.RWMutex
|
// cacheStorage bears the burden of thread safety for the cache
|
||||||
items map[string]interface{}
|
cacheStorage ThreadSafeStore
|
||||||
// keyFunc is used to make the key for objects stored in and retrieved from items, and
|
// keyFunc is used to make the key for objects stored in and retrieved from items, and
|
||||||
// should be deterministic.
|
// should be deterministic.
|
||||||
keyFunc KeyFunc
|
keyFunc KeyFunc
|
||||||
// indexers maps a name to an IndexFunc
|
|
||||||
indexers Indexers
|
|
||||||
// indices maps a name to an Index
|
|
||||||
indices Indices
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add inserts an item into the cache.
|
// Add inserts an item into the cache.
|
||||||
@ -95,66 +91,7 @@ func (c *cache) Add(obj interface{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return KeyError{obj, err}
|
return KeyError{obj, err}
|
||||||
}
|
}
|
||||||
// keep a pointer to whatever could have been there previously
|
c.cacheStorage.Add(key, obj)
|
||||||
c.lock.Lock()
|
|
||||||
defer c.lock.Unlock()
|
|
||||||
oldObject := c.items[key]
|
|
||||||
c.items[key] = obj
|
|
||||||
c.updateIndices(oldObject, obj)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
|
|
||||||
// updateIndices must be called from a function that already has a lock on the cache
|
|
||||||
func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error {
|
|
||||||
// if we got an old object, we need to remove it before we add it again
|
|
||||||
if oldObj != nil {
|
|
||||||
c.deleteFromIndices(oldObj)
|
|
||||||
}
|
|
||||||
key, err := c.keyFunc(newObj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for name, indexFunc := range c.indexers {
|
|
||||||
indexValue, err := indexFunc(newObj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
index := c.indices[name]
|
|
||||||
if index == nil {
|
|
||||||
index = Index{}
|
|
||||||
c.indices[name] = index
|
|
||||||
}
|
|
||||||
set := index[indexValue]
|
|
||||||
if set == nil {
|
|
||||||
set = util.StringSet{}
|
|
||||||
index[indexValue] = set
|
|
||||||
}
|
|
||||||
set.Insert(key)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteFromIndices removes the object from each of the managed indexes
|
|
||||||
// it is intended to be called from a function that already has a lock on the cache
|
|
||||||
func (c *cache) deleteFromIndices(obj interface{}) error {
|
|
||||||
key, err := c.keyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for name, indexFunc := range c.indexers {
|
|
||||||
indexValue, err := indexFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
index := c.indices[name]
|
|
||||||
if index != nil {
|
|
||||||
set := index[indexValue]
|
|
||||||
if set != nil {
|
|
||||||
set.Delete(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,11 +101,7 @@ func (c *cache) Update(obj interface{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return KeyError{obj, err}
|
return KeyError{obj, err}
|
||||||
}
|
}
|
||||||
c.lock.Lock()
|
c.cacheStorage.Update(key, obj)
|
||||||
defer c.lock.Unlock()
|
|
||||||
oldObject := c.items[key]
|
|
||||||
c.items[key] = obj
|
|
||||||
c.updateIndices(oldObject, obj)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,59 +111,26 @@ func (c *cache) Delete(obj interface{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return KeyError{obj, err}
|
return KeyError{obj, err}
|
||||||
}
|
}
|
||||||
c.lock.Lock()
|
c.cacheStorage.Delete(key)
|
||||||
defer c.lock.Unlock()
|
|
||||||
delete(c.items, key)
|
|
||||||
c.deleteFromIndices(obj)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of all the items.
|
// List returns a list of all the items.
|
||||||
// List is completely threadsafe as long as you treat all items as immutable.
|
// List is completely threadsafe as long as you treat all items as immutable.
|
||||||
func (c *cache) List() []interface{} {
|
func (c *cache) List() []interface{} {
|
||||||
c.lock.RLock()
|
return c.cacheStorage.List()
|
||||||
defer c.lock.RUnlock()
|
|
||||||
list := make([]interface{}, 0, len(c.items))
|
|
||||||
for _, item := range c.items {
|
|
||||||
list = append(list, item)
|
|
||||||
}
|
|
||||||
return list
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListKeys returns a list of all the keys of the objects currently
|
// ListKeys returns a list of all the keys of the objects currently
|
||||||
// in the cache.
|
// in the cache.
|
||||||
func (c *cache) ListKeys() []string {
|
func (c *cache) ListKeys() []string {
|
||||||
c.lock.RLock()
|
return c.cacheStorage.ListKeys()
|
||||||
defer c.lock.RUnlock()
|
|
||||||
list := make([]string, 0, len(c.items))
|
|
||||||
for key := range c.items {
|
|
||||||
list = append(list, key)
|
|
||||||
}
|
|
||||||
return list
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index returns a list of items that match on the index function
|
// Index returns a list of items that match on the index function
|
||||||
// Index is thread-safe so long as you treat all items as immutable
|
// Index is thread-safe so long as you treat all items as immutable
|
||||||
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
|
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
|
||||||
c.lock.RLock()
|
return c.cacheStorage.Index(indexName, obj)
|
||||||
defer c.lock.RUnlock()
|
|
||||||
|
|
||||||
indexFunc := c.indexers[indexName]
|
|
||||||
if indexFunc == nil {
|
|
||||||
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
|
||||||
}
|
|
||||||
|
|
||||||
indexKey, err := indexFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
index := c.indices[indexName]
|
|
||||||
set := index[indexKey]
|
|
||||||
list := make([]interface{}, 0, set.Len())
|
|
||||||
for _, key := range set.List() {
|
|
||||||
list = append(list, c.items[key])
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the requested item, or sets exists=false.
|
// Get returns the requested item, or sets exists=false.
|
||||||
@ -246,9 +146,7 @@ func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error)
|
|||||||
// GetByKey returns the request item, or exists=false.
|
// GetByKey returns the request item, or exists=false.
|
||||||
// GetByKey is completely threadsafe as long as you treat all items as immutable.
|
// GetByKey is completely threadsafe as long as you treat all items as immutable.
|
||||||
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
|
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
|
||||||
c.lock.RLock()
|
item, exists = c.cacheStorage.Get(key)
|
||||||
defer c.lock.RUnlock()
|
|
||||||
item, exists = c.items[key]
|
|
||||||
return item, exists, nil
|
return item, exists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,26 +162,22 @@ func (c *cache) Replace(list []interface{}) error {
|
|||||||
}
|
}
|
||||||
items[key] = item
|
items[key] = item
|
||||||
}
|
}
|
||||||
|
c.cacheStorage.Replace(items)
|
||||||
c.lock.Lock()
|
|
||||||
defer c.lock.Unlock()
|
|
||||||
c.items = items
|
|
||||||
|
|
||||||
// rebuild any index
|
|
||||||
c.indices = Indices{}
|
|
||||||
for _, item := range c.items {
|
|
||||||
c.updateIndices(nil, item)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore returns a Store implemented simply with a map and a lock.
|
// NewStore returns a Store implemented simply with a map and a lock.
|
||||||
func NewStore(keyFunc KeyFunc) Store {
|
func NewStore(keyFunc KeyFunc) Store {
|
||||||
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}}
|
return &cache{
|
||||||
|
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
|
||||||
|
keyFunc: keyFunc,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIndexer returns an Indexer implemented simply with a map and a lock.
|
// NewIndexer returns an Indexer implemented simply with a map and a lock.
|
||||||
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
|
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
|
||||||
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}}
|
return &cache{
|
||||||
|
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
|
||||||
|
keyFunc: keyFunc,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
201
pkg/client/cache/thread_safe_store.go
vendored
Normal file
201
pkg/client/cache/thread_safe_store.go
vendored
Normal file
@ -0,0 +1,201 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
|
||||||
|
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
|
||||||
|
// the indexing feature in addition to not being thread safe.
|
||||||
|
//
|
||||||
|
// The guarantees of thread safety provided by List/Get are only valid if the caller
|
||||||
|
// treats returned items as read-only. For example, a pointer inserted in the store
|
||||||
|
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
|
||||||
|
// on the same key and modify the pointer in a non-thread-safe way. Also note that
|
||||||
|
// modifying objects stored by the indexers (if any) will *not* automatically lead
|
||||||
|
// to a re-index. So it's not a good idea to directly modify the objects returned by
|
||||||
|
// Get/List, in general.
|
||||||
|
type ThreadSafeStore interface {
|
||||||
|
Add(key string, obj interface{})
|
||||||
|
Update(key string, obj interface{})
|
||||||
|
Delete(key string)
|
||||||
|
Get(key string) (item interface{}, exists bool)
|
||||||
|
List() []interface{}
|
||||||
|
ListKeys() []string
|
||||||
|
Replace(map[string]interface{})
|
||||||
|
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// threadSafeMap implements ThreadSafeStore
|
||||||
|
type threadSafeMap struct {
|
||||||
|
lock sync.RWMutex
|
||||||
|
items map[string]interface{}
|
||||||
|
|
||||||
|
// indexers maps a name to an IndexFunc
|
||||||
|
indexers Indexers
|
||||||
|
// indices maps a name to an Index
|
||||||
|
indices Indices
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *threadSafeMap) Add(key string, obj interface{}) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
oldObject := c.items[key]
|
||||||
|
c.items[key] = obj
|
||||||
|
c.updateIndices(oldObject, obj, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *threadSafeMap) Update(key string, obj interface{}) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
oldObject := c.items[key]
|
||||||
|
c.items[key] = obj
|
||||||
|
c.updateIndices(oldObject, obj, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *threadSafeMap) Delete(key string) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
if obj, exists := c.items[key]; exists {
|
||||||
|
c.deleteFromIndices(obj, key)
|
||||||
|
delete(c.items, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
item, exists = c.items[key]
|
||||||
|
return item, exists
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *threadSafeMap) List() []interface{} {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
list := make([]interface{}, 0, len(c.items))
|
||||||
|
for _, item := range c.items {
|
||||||
|
list = append(list, item)
|
||||||
|
}
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListKeys returns a list of all the keys of the objects currently
|
||||||
|
// in the threadSafeMap.
|
||||||
|
func (c *threadSafeMap) ListKeys() []string {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
list := make([]string, 0, len(c.items))
|
||||||
|
for key := range c.items {
|
||||||
|
list = append(list, key)
|
||||||
|
}
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *threadSafeMap) Replace(items map[string]interface{}) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
c.items = items
|
||||||
|
|
||||||
|
// rebuild any index
|
||||||
|
c.indices = Indices{}
|
||||||
|
for key, item := range c.items {
|
||||||
|
c.updateIndices(nil, item, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index returns a list of items that match on the index function
|
||||||
|
// Index is thread-safe so long as you treat all items as immutable
|
||||||
|
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
indexFunc := c.indexers[indexName]
|
||||||
|
if indexFunc == nil {
|
||||||
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
||||||
|
}
|
||||||
|
|
||||||
|
indexKey, err := indexFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
index := c.indices[indexName]
|
||||||
|
set := index[indexKey]
|
||||||
|
list := make([]interface{}, 0, set.Len())
|
||||||
|
for _, key := range set.List() {
|
||||||
|
list = append(list, c.items[key])
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
|
||||||
|
// updateIndices must be called from a function that already has a lock on the cache
|
||||||
|
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
|
||||||
|
// if we got an old object, we need to remove it before we add it again
|
||||||
|
if oldObj != nil {
|
||||||
|
c.deleteFromIndices(oldObj, key)
|
||||||
|
}
|
||||||
|
for name, indexFunc := range c.indexers {
|
||||||
|
indexValue, err := indexFunc(newObj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
index := c.indices[name]
|
||||||
|
if index == nil {
|
||||||
|
index = Index{}
|
||||||
|
c.indices[name] = index
|
||||||
|
}
|
||||||
|
set := index[indexValue]
|
||||||
|
if set == nil {
|
||||||
|
set = util.StringSet{}
|
||||||
|
index[indexValue] = set
|
||||||
|
}
|
||||||
|
set.Insert(key)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteFromIndices removes the object from each of the managed indexes
|
||||||
|
// it is intended to be called from a function that already has a lock on the cache
|
||||||
|
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
|
||||||
|
for name, indexFunc := range c.indexers {
|
||||||
|
indexValue, err := indexFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
index := c.indices[name]
|
||||||
|
if index != nil {
|
||||||
|
set := index[indexValue]
|
||||||
|
if set != nil {
|
||||||
|
set.Delete(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
|
||||||
|
return &threadSafeMap{
|
||||||
|
items: map[string]interface{}{},
|
||||||
|
indexers: indexers,
|
||||||
|
indices: Indices{},
|
||||||
|
}
|
||||||
|
}
|
@ -20,16 +20,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"mime"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/metrics"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/metrics"
|
||||||
@ -41,6 +31,15 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// specialParams lists parameters that are handled specially and which users of Request
|
// specialParams lists parameters that are handled specially and which users of Request
|
||||||
|
@ -18,10 +18,9 @@ package runtime
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Scheme defines methods for serializing and deserializing API objects. It
|
// Scheme defines methods for serializing and deserializing API objects. It
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
// needs to do arbitrary things based on time.
|
// needs to do arbitrary things based on time.
|
||||||
type Clock interface {
|
type Clock interface {
|
||||||
Now() time.Time
|
Now() time.Time
|
||||||
|
Since(time.Time) time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// RealClock really calls time.Now()
|
// RealClock really calls time.Now()
|
||||||
@ -34,6 +35,11 @@ func (r RealClock) Now() time.Time {
|
|||||||
return time.Now()
|
return time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since returns time since the specified timestamp.
|
||||||
|
func (r RealClock) Since(ts time.Time) time.Duration {
|
||||||
|
return time.Since(ts)
|
||||||
|
}
|
||||||
|
|
||||||
// FakeClock implements Clock, but returns an arbitary time.
|
// FakeClock implements Clock, but returns an arbitary time.
|
||||||
type FakeClock struct {
|
type FakeClock struct {
|
||||||
Time time.Time
|
Time time.Time
|
||||||
@ -43,3 +49,8 @@ type FakeClock struct {
|
|||||||
func (f *FakeClock) Now() time.Time {
|
func (f *FakeClock) Now() time.Time {
|
||||||
return f.Time
|
return f.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since returns time since the time in f.
|
||||||
|
func (f *FakeClock) Since(ts time.Time) time.Duration {
|
||||||
|
return f.Time.Sub(ts)
|
||||||
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
@ -95,7 +96,9 @@ func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModele
|
|||||||
return &SimpleModeler{
|
return &SimpleModeler{
|
||||||
queuedPods: queuedPods,
|
queuedPods: queuedPods,
|
||||||
scheduledPods: scheduledPods,
|
scheduledPods: scheduledPods,
|
||||||
assumedPods: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
assumedPods: &cache.StoreToPodLister{
|
||||||
|
cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,10 +127,6 @@ func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err
|
|||||||
// Since the assumed list will be short, just check every one.
|
// Since the assumed list will be short, just check every one.
|
||||||
// Goal here is to stop making assumptions about a pod once it shows
|
// Goal here is to stop making assumptions about a pod once it shows
|
||||||
// up in one of these other lists.
|
// up in one of these other lists.
|
||||||
// TODO: there's a possibility that a pod could get deleted at the
|
|
||||||
// exact wrong time and linger in assumedPods forever. So we
|
|
||||||
// need go through that periodically and check for deleted
|
|
||||||
// pods.
|
|
||||||
for _, pod := range assumed {
|
for _, pod := range assumed {
|
||||||
qExist, err := s.queuedPods.Exists(&pod)
|
qExist, err := s.queuedPods.Exists(&pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -151,7 +150,7 @@ func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// re-get in case we deleted any.
|
// Listing purges the ttl cache and re-gets, in case we deleted any entries.
|
||||||
assumed, err = s.assumedPods.List(selector)
|
assumed, err = s.assumedPods.List(selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -18,11 +18,14 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -43,6 +46,14 @@ func podWithID(id, desiredHost string) *api.Pod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podWithPort(id, desiredHost string, port int) *api.Pod {
|
||||||
|
pod := podWithID(id, desiredHost)
|
||||||
|
pod.Spec.Containers = []api.Container{
|
||||||
|
{Name: "ctr", Ports: []api.ContainerPort{{HostPort: port}}},
|
||||||
|
}
|
||||||
|
return pod
|
||||||
|
}
|
||||||
|
|
||||||
type mockScheduler struct {
|
type mockScheduler struct {
|
||||||
machine string
|
machine string
|
||||||
err error
|
err error
|
||||||
@ -144,3 +155,142 @@ func TestScheduler(t *testing.T) {
|
|||||||
events.Stop()
|
events.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
|
||||||
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
defer eventBroadcaster.StartLogging(t.Logf).Stop()
|
||||||
|
|
||||||
|
// Setup modeler so we control the contents of all 3 stores: assumed,
|
||||||
|
// scheduled and queued
|
||||||
|
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
scheduledPodLister := &cache.StoreToPodLister{scheduledPodStore}
|
||||||
|
|
||||||
|
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||||
|
queuedPodLister := &cache.StoreToPodLister{queuedPodStore}
|
||||||
|
|
||||||
|
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
|
||||||
|
|
||||||
|
// Create a fake clock used to timestamp entries and calculate ttl. Nothing
|
||||||
|
// will expire till we flip to something older than the ttl, at which point
|
||||||
|
// all entries inserted with fakeTime will expire.
|
||||||
|
ttl := 30 * time.Second
|
||||||
|
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
|
||||||
|
fakeClock := &util.FakeClock{fakeTime}
|
||||||
|
ttlPolicy := &cache.TTLPolicy{ttl, fakeClock}
|
||||||
|
assumedPodsStore := cache.NewFakeExpirationStore(
|
||||||
|
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
|
||||||
|
modeler.assumedPods = &cache.StoreToPodLister{assumedPodsStore}
|
||||||
|
|
||||||
|
// Port is the easiest way to cause a fit predicate failure
|
||||||
|
podPort := 8080
|
||||||
|
firstPod := podWithPort("foo", "", podPort)
|
||||||
|
|
||||||
|
// Create the scheduler config
|
||||||
|
algo := scheduler.NewGenericScheduler(
|
||||||
|
map[string]scheduler.FitPredicate{"PodFitsPorts": scheduler.PodFitsPorts},
|
||||||
|
[]scheduler.PriorityConfig{},
|
||||||
|
modeler.PodLister(),
|
||||||
|
rand.New(rand.NewSource(time.Now().UnixNano())))
|
||||||
|
|
||||||
|
var gotBinding *api.Binding
|
||||||
|
c := &Config{
|
||||||
|
Modeler: modeler,
|
||||||
|
MinionLister: scheduler.FakeMinionLister(
|
||||||
|
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
|
||||||
|
),
|
||||||
|
Algorithm: algo,
|
||||||
|
Binder: fakeBinder{func(b *api.Binding) error {
|
||||||
|
scheduledPodStore.Add(podWithPort(b.Name, b.Target.Name, podPort))
|
||||||
|
gotBinding = b
|
||||||
|
return nil
|
||||||
|
}},
|
||||||
|
NextPod: func() *api.Pod {
|
||||||
|
return queuedPodStore.Pop().(*api.Pod)
|
||||||
|
},
|
||||||
|
Error: func(p *api.Pod, err error) {
|
||||||
|
t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)
|
||||||
|
},
|
||||||
|
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// First scheduling pass should schedule the pod
|
||||||
|
s := New(c)
|
||||||
|
called := make(chan struct{})
|
||||||
|
events := eventBroadcaster.StartEventWatcher(func(e *api.Event) {
|
||||||
|
if e, a := "scheduled", e.Reason; e != a {
|
||||||
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
close(called)
|
||||||
|
})
|
||||||
|
|
||||||
|
queuedPodStore.Add(firstPod)
|
||||||
|
// queuedPodStore: [foo:8080]
|
||||||
|
// scheduledPodStore: []
|
||||||
|
// assumedPods: []
|
||||||
|
|
||||||
|
s.scheduleOne()
|
||||||
|
// queuedPodStore: []
|
||||||
|
// scheduledPodStore: [foo:8080]
|
||||||
|
// assumedPods: [foo:8080]
|
||||||
|
|
||||||
|
pod, exists, _ := scheduledPodStore.GetByKey("foo")
|
||||||
|
if !exists {
|
||||||
|
t.Errorf("Expected scheduled pod store to contain pod")
|
||||||
|
}
|
||||||
|
pod, exists, _ = queuedPodStore.GetByKey("foo")
|
||||||
|
if exists {
|
||||||
|
t.Errorf("Did not expect a queued pod, found %+v", pod)
|
||||||
|
}
|
||||||
|
pod, exists, _ = assumedPodsStore.GetByKey("foo")
|
||||||
|
if !exists {
|
||||||
|
t.Errorf("Assumed pod store should contain stale pod")
|
||||||
|
}
|
||||||
|
|
||||||
|
expectBind := &api.Binding{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||||
|
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
||||||
|
}
|
||||||
|
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
|
||||||
|
t.Errorf("Expected exact match on binding: %s", util.ObjectDiff(ex, ac))
|
||||||
|
}
|
||||||
|
|
||||||
|
<-called
|
||||||
|
events.Stop()
|
||||||
|
|
||||||
|
scheduledPodStore.Delete(pod)
|
||||||
|
_, exists, _ = assumedPodsStore.Get(pod)
|
||||||
|
if !exists {
|
||||||
|
t.Errorf("Expected pod %#v in assumed pod store", pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
secondPod := podWithPort("bar", "", podPort)
|
||||||
|
queuedPodStore.Add(secondPod)
|
||||||
|
// queuedPodStore: [bar:8080]
|
||||||
|
// scheduledPodStore: []
|
||||||
|
// assumedPods: [foo:8080]
|
||||||
|
|
||||||
|
// Second scheduling pass will fail to schedule if the store hasn't expired
|
||||||
|
// the deleted pod. This would normally happen with a timeout.
|
||||||
|
//expirationPolicy.NeverExpire = util.NewStringSet()
|
||||||
|
fakeClock.Time = fakeClock.Time.Add(ttl + 1)
|
||||||
|
|
||||||
|
called = make(chan struct{})
|
||||||
|
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {
|
||||||
|
if e, a := "scheduled", e.Reason; e != a {
|
||||||
|
t.Errorf("expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
close(called)
|
||||||
|
})
|
||||||
|
|
||||||
|
s.scheduleOne()
|
||||||
|
|
||||||
|
expectBind = &api.Binding{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "bar"},
|
||||||
|
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
|
||||||
|
}
|
||||||
|
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
|
||||||
|
t.Errorf("Expected exact match on binding: %s", util.ObjectDiff(ex, ac))
|
||||||
|
}
|
||||||
|
<-called
|
||||||
|
events.Stop()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user