Merge pull request #21016 from hongchaodeng/cache

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-20 02:47:53 -08:00
commit 3639e43df2
4 changed files with 895 additions and 5 deletions

View File

@ -0,0 +1,264 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/wait"
)
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
type schedulerCache struct {
stop chan struct{}
ttl time.Duration
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.Mutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods map[string]bool
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*NodeInfo
}
type podState struct {
pod *api.Pod
// Used by assumedPod to determinate expiration.
deadline time.Time
}
func newSchedulerCache(ttl, period time.Duration, stop chan struct{}) *schedulerCache {
return &schedulerCache{
ttl: ttl,
period: period,
stop: stop,
nodes: make(map[string]*NodeInfo),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
}
}
func (cache *schedulerCache) GetNodeNameToInfoMap() (map[string]*NodeInfo, error) {
nodeNameToInfo := make(map[string]*NodeInfo)
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
nodeNameToInfo[name] = info.Clone()
}
return nodeNameToInfo, nil
}
func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
var pods []*api.Pod
for _, info := range cache.nodes {
for _, pod := range info.pods {
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
}
}
return pods, nil
}
func (cache *schedulerCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
return cache.assumePodIfBindSucceed(pod, bind, time.Now())
}
// assumePodScheduled exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) assumePodIfBindSucceed(pod *api.Pod, bind func() bool, now time.Time) error {
cache.mu.Lock()
defer cache.mu.Unlock()
if !bind() {
return nil
}
key, err := getPodKey(pod)
if err != nil {
return err
}
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod state wasn't initial but get assumed. Pod key: %v", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
deadline: now.Add(cache.ttl),
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
func (cache *schedulerCache) AddPod(pod *api.Pod) error {
key, err := getPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
_, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods[key]:
delete(cache.assumedPods, key)
case !ok:
// Pod was expired. We should add it back.
cache.addPod(pod)
default:
return fmt.Errorf("pod was already in added state. Pod key: %v", key)
}
return nil
}
func (cache *schedulerCache) UpdatePod(oldPod, newPod *api.Pod) error {
key, err := getPodKey(oldPod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
_, ok := cache.podStates[key]
switch {
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
if err := cache.updatePod(oldPod, newPod); err != nil {
return err
}
default:
return fmt.Errorf("pod state wasn't added but get updated. Pod key: %v", key)
}
return nil
}
func (cache *schedulerCache) updatePod(oldPod, newPod *api.Pod) error {
if err := cache.deletePod(oldPod); err != nil {
return err
}
cache.addPod(newPod)
return nil
}
func (cache *schedulerCache) addPod(pod *api.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
}
func (cache *schedulerCache) deletePod(pod *api.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.removePod(pod); err != nil {
return err
}
if len(n.pods) == 0 {
delete(cache.nodes, pod.Spec.NodeName)
}
return nil
}
func (cache *schedulerCache) RemovePod(pod *api.Pod) error {
key, err := getPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
_, ok := cache.podStates[key]
switch {
// An assumed pod won't have Delete/Remove event. It needs to have Add event
// before Remove event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
err := cache.deletePod(pod)
if err != nil {
return err
}
delete(cache.podStates, key)
default:
return fmt.Errorf("pod state wasn't added but get removed. Pod key: %v", key)
}
return nil
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if now.After(ps.deadline) {
if err := cache.expirePod(key, ps); err != nil {
glog.Errorf(" expirePod failed for %s: %v", key, err)
}
}
}
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.deletePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}

View File

@ -0,0 +1,482 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/labels"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
)
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
// on node level.
func TestAssumePodScheduled(t *testing.T) {
nodeName := "node"
testPods := []*api.Pod{
makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}),
makeBasePod(nodeName, "test-1", "100m", "500", []api.ContainerPort{{HostPort: 80}}),
makeBasePod(nodeName, "test-2", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}),
makeBasePod(nodeName, "test-nonzero", "", "", []api.ContainerPort{{HostPort: 80}}),
}
tests := []struct {
pods []*api.Pod
wNodeInfo *NodeInfo
}{{
pods: []*api.Pod{testPods[0]},
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 100,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
},
}, {
pods: []*api.Pod{testPods[1], testPods[2]},
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 300,
Memory: 1524,
},
nonzeroRequest: &Resource{
MilliCPU: 300,
Memory: 1524,
},
pods: []*api.Pod{testPods[1], testPods[2]},
},
}, { // test non-zero request
pods: []*api.Pod{testPods[3]},
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 0,
Memory: 0,
},
nonzeroRequest: &Resource{
MilliCPU: priorityutil.DefaultMilliCpuRequest,
Memory: priorityutil.DefaultMemoryRequest,
},
pods: []*api.Pod{testPods[3]},
},
}}
for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.AssumePodIfBindSucceed(pod, alwaysTrue); err != nil {
t.Fatalf("AssumePodScheduled failed: %v", err)
}
}
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
}
}
type testExpirePodStruct struct {
pod *api.Pod
assumedTime time.Time
}
// TestExpirePod tests that assumed pods will be removed if expired.
// The removal will be reflected in node info.
func TestExpirePod(t *testing.T) {
nodeName := "node"
testPods := []*api.Pod{
makeBasePod(nodeName, "test-1", "100m", "500", []api.ContainerPort{{HostPort: 80}}),
makeBasePod(nodeName, "test-2", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}),
}
now := time.Now()
ttl := 10 * time.Second
tests := []struct {
pods []*testExpirePodStruct
cleanupTime time.Time
wNodeInfo *NodeInfo
}{{ // assumed pod would expires
pods: []*testExpirePodStruct{
{pod: testPods[0], assumedTime: now},
},
cleanupTime: now.Add(2 * ttl),
wNodeInfo: nil,
}, { // first one would expire, second one would not.
pods: []*testExpirePodStruct{
{pod: testPods[0], assumedTime: now},
{pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)},
},
cleanupTime: now.Add(2 * ttl),
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 200,
Memory: 1024,
},
nonzeroRequest: &Resource{
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
},
}}
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.assumePodIfBindSucceed(pod.pod, alwaysTrue, pod.assumedTime); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
// pods that have assumedTime + ttl < cleanupTime will get expired and removed
cache.cleanupAssumedPods(tt.cleanupTime)
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
}
}
// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed.
// The pod info should still exist after manually expiring unconfirmed pods.
func TestAddPodWillConfirm(t *testing.T) {
nodeName := "node"
now := time.Now()
ttl := 10 * time.Second
testPods := []*api.Pod{
makeBasePod(nodeName, "test-1", "100m", "500", []api.ContainerPort{{HostPort: 80}}),
makeBasePod(nodeName, "test-2", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}),
}
tests := []struct {
podsToAssume []*api.Pod
podsToAdd []*api.Pod
wNodeInfo *NodeInfo
}{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
podsToAssume: []*api.Pod{testPods[0], testPods[1]},
podsToAdd: []*api.Pod{testPods[0]},
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 100,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
},
}}
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
}
}
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
func TestAddPodAfterExpiration(t *testing.T) {
nodeName := "node"
ttl := 10 * time.Second
basePod := makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}})
tests := []struct {
pod *api.Pod
wNodeInfo *NodeInfo
}{{
pod: basePod,
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 100,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{basePod},
},
}}
now := time.Now()
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
if err := cache.assumePodIfBindSucceed(tt.pod, alwaysTrue, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
// It should be expired and removed.
n := cache.nodes[nodeName]
if n != nil {
t.Errorf("#%d: expecting nil node info, but get=%v", i, n)
}
if err := cache.AddPod(tt.pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n = cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
}
}
// TestUpdatePod tests that a pod will be updated if added before.
func TestUpdatePod(t *testing.T) {
nodeName := "node"
ttl := 10 * time.Second
testPods := []*api.Pod{
makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}),
makeBasePod(nodeName, "test", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}),
}
tests := []struct {
podsToAssume []*api.Pod
podsToAdd []*api.Pod
podsToUpdate []*api.Pod
wNodeInfo []*NodeInfo
}{{ // Pod is assumed and added. Then it would be updated twice.
podsToAssume: []*api.Pod{testPods[0]},
podsToAdd: []*api.Pod{testPods[0]},
podsToUpdate: []*api.Pod{testPods[0], testPods[1], testPods[0]},
wNodeInfo: []*NodeInfo{{
requestedResource: &Resource{
MilliCPU: 200,
Memory: 1024,
},
nonzeroRequest: &Resource{
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
}, {
requestedResource: &Resource{
MilliCPU: 100,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
}},
}}
now := time.Now()
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
for i := range tt.podsToUpdate {
if i == 0 {
continue
}
if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil {
t.Fatalf("UpdatePod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) {
t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo)
}
}
}
}
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
func TestRemovePod(t *testing.T) {
nodeName := "node"
basePod := makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}})
tests := []struct {
pod *api.Pod
wNodeInfo *NodeInfo
}{{
pod: basePod,
wNodeInfo: &NodeInfo{
requestedResource: &Resource{
MilliCPU: 100,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{basePod},
},
}}
for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil)
if err := cache.AssumePodIfBindSucceed(tt.pod, alwaysTrue); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
if err := cache.AddPod(tt.pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo) {
t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo)
}
if err := cache.RemovePod(tt.pod); err != nil {
t.Fatalf("RemovePod failed: %v", err)
}
n = cache.nodes[nodeName]
if n != nil {
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n)
}
}
}
func BenchmarkGetNodeNameToInfoMap1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cache.GetNodeNameToInfoMap()
}
}
func BenchmarkList1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cache.List(labels.Everything())
}
}
func BenchmarkExpire100Pods(b *testing.B) {
benchmarkExpire(b, 100)
}
func BenchmarkExpire1kPods(b *testing.B) {
benchmarkExpire(b, 1000)
}
func BenchmarkExpire10kPods(b *testing.B) {
benchmarkExpire(b, 10000)
}
func benchmarkExpire(b *testing.B, podNum int) {
now := time.Now()
for n := 0; n < b.N; n++ {
b.StopTimer()
cache := setupCacheWithAssumedPods(b, podNum, now)
b.StartTimer()
cache.cleanupAssumedPods(now.Add(2 * time.Second))
}
}
func makeBasePod(nodeName, objName, cpu, mem string, ports []api.ContainerPort) *api.Pod {
req := api.ResourceList{}
if cpu != "" {
req = api.ResourceList{
api.ResourceCPU: resource.MustParse(cpu),
api.ResourceMemory: resource.MustParse(mem),
}
}
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Namespace: "node_info_cache_test",
Name: objName,
},
Spec: api.PodSpec{
Containers: []api.Container{{
Resources: api.ResourceRequirements{
Requests: req,
},
Ports: ports,
}},
NodeName: nodeName,
},
}
}
func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
cache := newSchedulerCache(time.Second, time.Second, nil)
for i := 0; i < 1000; i++ {
nodeName := fmt.Sprintf("node-%d", i)
for j := 0; j < 30; j++ {
objName := fmt.Sprintf("%s-pod-%d", nodeName, j)
pod := makeBasePod(nodeName, objName, "0", "0", nil)
err := cache.AssumePodIfBindSucceed(pod, alwaysTrue)
if err != nil {
b.Fatalf("AssumePodIfBindSucceed failed: %v", err)
}
err = cache.AddPod(pod)
if err != nil {
b.Fatalf("AddPod failed: %v", err)
}
}
}
return cache
}
func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache {
cache := newSchedulerCache(time.Second, time.Second, nil)
for i := 0; i < podNum; i++ {
nodeName := fmt.Sprintf("node-%d", i/10)
objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
pod := makeBasePod(nodeName, objName, "0", "0", nil)
err := cache.assumePodIfBindSucceed(pod, alwaysTrue, assumedTime)
if err != nil {
b.Fatalf("assumePodIfBindSucceed failed: %v", err)
}
}
return cache
}
func alwaysTrue() bool {
return true
}

View File

@ -0,0 +1,81 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It incrementally updates itself based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
//
// State Machine of a pod's events in scheduler's cache:
//
// +-------+
// | |
// | | Update
// Assume Add + |
// Initial +--------> Assumed +------------+---> Added <--+
// + | +
// | | |
// | Add | | Remove
// | | |
// | + |
// +-------------> Expired +----> Deleted
// Expire
//
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// AssumePodIfBindSucceed assumes a pod to be scheduled if binding the pod succeeded.
// If binding return true, the pod's information is aggregated into designated node.
// Note that both binding and assuming are done as one atomic operation from cache's view.
// No other events like Add would happen in between binding and assuming.
// We are passing the binding function and let implementation take care of concurrency control details.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *api.Pod) error
// UpdatePod removes oldPod's information and adds newPod's information.
UpdatePod(oldPod, newPod *api.Pod) error
// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(pod *api.Pod) error
// GetNodeNameToInfoMap returns a map of node names to node info. The node info contains
// aggregated information of pods scheduled (including assumed to be) on this node.
GetNodeNameToInfoMap() (map[string]*NodeInfo, error)
// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*api.Pod, error)
}

View File

@ -19,7 +19,11 @@ package schedulercache
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
clientcache "k8s.io/kubernetes/pkg/client/cache"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
)
var emptyResource = Resource{}
@ -31,6 +35,7 @@ type NodeInfo struct {
// didn't get it as scheduled yet.
requestedResource *Resource
pods []*api.Pod
nonzeroRequest *Resource
}
// Resource is a collection of compute resource.
@ -45,6 +50,7 @@ type Resource struct {
func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
ni := &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
}
for _, pod := range pods {
ni.addPod(pod)
@ -68,29 +74,86 @@ func (n *NodeInfo) RequestedResource() Resource {
return *n.requestedResource
}
// NonZeroRequest returns aggregated nonzero resource request of pods on this node.
func (n *NodeInfo) NonZeroRequest() Resource {
if n == nil {
return emptyResource
}
return *n.nonzeroRequest
}
func (n *NodeInfo) Clone() *NodeInfo {
pods := append([]*api.Pod(nil), n.pods...)
clone := &NodeInfo{
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
pods: pods,
}
return clone
}
// String returns representation of human readable format of this NodeInfo.
func (n *NodeInfo) String() string {
podKeys := make([]string, len(n.pods))
for i, pod := range n.pods {
podKeys[i] = pod.Name
}
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v}", podKeys, n.requestedResource)
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v}", podKeys, n.requestedResource, n.nonzeroRequest)
}
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *api.Pod) {
cpu, mem := calculateResource(pod)
cpu, mem, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += cpu
n.requestedResource.Memory += mem
n.nonzeroRequest.MilliCPU += non0_cpu
n.nonzeroRequest.Memory += non0_mem
n.pods = append(n.pods, pod)
}
func calculateResource(pod *api.Pod) (int64, int64) {
var cpu, mem int64
// removePod subtracts pod information to this NodeInfo.
func (n *NodeInfo) removePod(pod *api.Pod) error {
k1, err := getPodKey(pod)
if err != nil {
return err
}
cpu, mem, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU -= cpu
n.requestedResource.Memory -= mem
n.nonzeroRequest.MilliCPU -= non0_cpu
n.nonzeroRequest.Memory -= non0_mem
for i := range n.pods {
k2, err := getPodKey(n.pods[i])
if err != nil {
glog.Errorf("Cannot get pod key, err: %v", err)
continue
}
if k1 == k2 {
// delete the element
n.pods[i] = n.pods[len(n.pods)-1]
n.pods = n.pods[:len(n.pods)-1]
return nil
}
}
return fmt.Errorf("no corresponding pod in pods")
}
func calculateResource(pod *api.Pod) (cpu int64, mem int64, non0_cpu int64, non0_mem int64) {
for _, c := range pod.Spec.Containers {
req := c.Resources.Requests
cpu += req.Cpu().MilliValue()
mem += req.Memory().Value()
non0_cpu_req, non0_mem_req := priorityutil.GetNonzeroRequests(&req)
non0_cpu += non0_cpu_req
non0_mem += non0_mem_req
}
return cpu, mem
return
}
// getPodKey returns the string key of a pod.
func getPodKey(pod *api.Pod) (string, error) {
return clientcache.MetaNamespaceKeyFunc(pod)
}