diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go new file mode 100644 index 00000000000..11f75988479 --- /dev/null +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -0,0 +1,264 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import ( + "fmt" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/wait" +) + +var ( + cleanAssumedPeriod = 1 * time.Second +) + +// New returns a Cache implementation. +// It automatically starts a go routine that manages expiration of assumed pods. +// "ttl" is how long the assumed pod will get expired. +// "stop" is the channel that would close the background goroutine. +func New(ttl time.Duration, stop chan struct{}) Cache { + cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop) + cache.run() + return cache +} + +type schedulerCache struct { + stop chan struct{} + ttl time.Duration + period time.Duration + + // This mutex guards all fields within this cache struct. + mu sync.Mutex + // a set of assumed pod keys. + // The key could further be used to get an entry in podStates. + assumedPods map[string]bool + // a map from pod key to podState. + podStates map[string]*podState + nodes map[string]*NodeInfo +} + +type podState struct { + pod *api.Pod + // Used by assumedPod to determinate expiration. + deadline time.Time +} + +func newSchedulerCache(ttl, period time.Duration, stop chan struct{}) *schedulerCache { + return &schedulerCache{ + ttl: ttl, + period: period, + stop: stop, + + nodes: make(map[string]*NodeInfo), + assumedPods: make(map[string]bool), + podStates: make(map[string]*podState), + } +} + +func (cache *schedulerCache) GetNodeNameToInfoMap() (map[string]*NodeInfo, error) { + nodeNameToInfo := make(map[string]*NodeInfo) + cache.mu.Lock() + defer cache.mu.Unlock() + for name, info := range cache.nodes { + nodeNameToInfo[name] = info.Clone() + } + return nodeNameToInfo, nil +} + +func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error) { + cache.mu.Lock() + defer cache.mu.Unlock() + var pods []*api.Pod + for _, info := range cache.nodes { + for _, pod := range info.pods { + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) + } + } + } + return pods, nil +} + +func (cache *schedulerCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error { + return cache.assumePodIfBindSucceed(pod, bind, time.Now()) +} + +// assumePodScheduled exists for making test deterministic by taking time as input argument. +func (cache *schedulerCache) assumePodIfBindSucceed(pod *api.Pod, bind func() bool, now time.Time) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + if !bind() { + return nil + } + + key, err := getPodKey(pod) + if err != nil { + return err + } + if _, ok := cache.podStates[key]; ok { + return fmt.Errorf("pod state wasn't initial but get assumed. Pod key: %v", key) + } + + cache.addPod(pod) + ps := &podState{ + pod: pod, + deadline: now.Add(cache.ttl), + } + cache.podStates[key] = ps + cache.assumedPods[key] = true + return nil +} + +func (cache *schedulerCache) AddPod(pod *api.Pod) error { + key, err := getPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + _, ok := cache.podStates[key] + switch { + case ok && cache.assumedPods[key]: + delete(cache.assumedPods, key) + case !ok: + // Pod was expired. We should add it back. + cache.addPod(pod) + default: + return fmt.Errorf("pod was already in added state. Pod key: %v", key) + } + return nil +} + +func (cache *schedulerCache) UpdatePod(oldPod, newPod *api.Pod) error { + key, err := getPodKey(oldPod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + _, ok := cache.podStates[key] + switch { + // An assumed pod won't have Update/Remove event. It needs to have Add event + // before Update event, in which case the state would change from Assumed to Added. + case ok && !cache.assumedPods[key]: + if err := cache.updatePod(oldPod, newPod); err != nil { + return err + } + default: + return fmt.Errorf("pod state wasn't added but get updated. Pod key: %v", key) + } + return nil +} + +func (cache *schedulerCache) updatePod(oldPod, newPod *api.Pod) error { + if err := cache.deletePod(oldPod); err != nil { + return err + } + cache.addPod(newPod) + return nil +} + +func (cache *schedulerCache) addPod(pod *api.Pod) { + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + n = NewNodeInfo() + cache.nodes[pod.Spec.NodeName] = n + } + n.addPod(pod) +} + +func (cache *schedulerCache) deletePod(pod *api.Pod) error { + n := cache.nodes[pod.Spec.NodeName] + if err := n.removePod(pod); err != nil { + return err + } + if len(n.pods) == 0 { + delete(cache.nodes, pod.Spec.NodeName) + } + return nil +} + +func (cache *schedulerCache) RemovePod(pod *api.Pod) error { + key, err := getPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + _, ok := cache.podStates[key] + switch { + // An assumed pod won't have Delete/Remove event. It needs to have Add event + // before Remove event, in which case the state would change from Assumed to Added. + case ok && !cache.assumedPods[key]: + err := cache.deletePod(pod) + if err != nil { + return err + } + delete(cache.podStates, key) + default: + return fmt.Errorf("pod state wasn't added but get removed. Pod key: %v", key) + } + return nil +} + +func (cache *schedulerCache) run() { + go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) +} + +func (cache *schedulerCache) cleanupExpiredAssumedPods() { + cache.cleanupAssumedPods(time.Now()) +} + +// cleanupAssumedPods exists for making test deterministic by taking time as input argument. +func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { + cache.mu.Lock() + defer cache.mu.Unlock() + + // The size of assumedPods should be small + for key := range cache.assumedPods { + ps, ok := cache.podStates[key] + if !ok { + panic("Key found in assumed set but not in podStates. Potentially a logical error.") + } + if now.After(ps.deadline) { + if err := cache.expirePod(key, ps); err != nil { + glog.Errorf(" expirePod failed for %s: %v", key, err) + } + } + } +} + +func (cache *schedulerCache) expirePod(key string, ps *podState) error { + if err := cache.deletePod(ps.pod); err != nil { + return err + } + delete(cache.assumedPods, key) + delete(cache.podStates, key) + return nil +} diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go new file mode 100644 index 00000000000..094f5352440 --- /dev/null +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -0,0 +1,482 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/labels" + priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" +) + +// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated +// on node level. +func TestAssumePodScheduled(t *testing.T) { + nodeName := "node" + testPods := []*api.Pod{ + makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}), + makeBasePod(nodeName, "test-1", "100m", "500", []api.ContainerPort{{HostPort: 80}}), + makeBasePod(nodeName, "test-2", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}), + makeBasePod(nodeName, "test-nonzero", "", "", []api.ContainerPort{{HostPort: 80}}), + } + + tests := []struct { + pods []*api.Pod + + wNodeInfo *NodeInfo + }{{ + pods: []*api.Pod{testPods[0]}, + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + nonzeroRequest: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + pods: []*api.Pod{testPods[0]}, + }, + }, { + pods: []*api.Pod{testPods[1], testPods[2]}, + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 300, + Memory: 1524, + }, + nonzeroRequest: &Resource{ + MilliCPU: 300, + Memory: 1524, + }, + pods: []*api.Pod{testPods[1], testPods[2]}, + }, + }, { // test non-zero request + pods: []*api.Pod{testPods[3]}, + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 0, + Memory: 0, + }, + nonzeroRequest: &Resource{ + MilliCPU: priorityutil.DefaultMilliCpuRequest, + Memory: priorityutil.DefaultMemoryRequest, + }, + pods: []*api.Pod{testPods[3]}, + }, + }} + + for i, tt := range tests { + cache := newSchedulerCache(time.Second, time.Second, nil) + for _, pod := range tt.pods { + if err := cache.AssumePodIfBindSucceed(pod, alwaysTrue); err != nil { + t.Fatalf("AssumePodScheduled failed: %v", err) + } + } + n := cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo) { + t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) + } + } +} + +type testExpirePodStruct struct { + pod *api.Pod + assumedTime time.Time +} + +// TestExpirePod tests that assumed pods will be removed if expired. +// The removal will be reflected in node info. +func TestExpirePod(t *testing.T) { + nodeName := "node" + testPods := []*api.Pod{ + makeBasePod(nodeName, "test-1", "100m", "500", []api.ContainerPort{{HostPort: 80}}), + makeBasePod(nodeName, "test-2", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}), + } + now := time.Now() + ttl := 10 * time.Second + tests := []struct { + pods []*testExpirePodStruct + cleanupTime time.Time + + wNodeInfo *NodeInfo + }{{ // assumed pod would expires + pods: []*testExpirePodStruct{ + {pod: testPods[0], assumedTime: now}, + }, + cleanupTime: now.Add(2 * ttl), + wNodeInfo: nil, + }, { // first one would expire, second one would not. + pods: []*testExpirePodStruct{ + {pod: testPods[0], assumedTime: now}, + {pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)}, + }, + cleanupTime: now.Add(2 * ttl), + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 200, + Memory: 1024, + }, + nonzeroRequest: &Resource{ + MilliCPU: 200, + Memory: 1024, + }, + pods: []*api.Pod{testPods[1]}, + }, + }} + + for i, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + + for _, pod := range tt.pods { + if err := cache.assumePodIfBindSucceed(pod.pod, alwaysTrue, pod.assumedTime); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + // pods that have assumedTime + ttl < cleanupTime will get expired and removed + cache.cleanupAssumedPods(tt.cleanupTime) + n := cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo) { + t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) + } + } +} + +// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed. +// The pod info should still exist after manually expiring unconfirmed pods. +func TestAddPodWillConfirm(t *testing.T) { + nodeName := "node" + now := time.Now() + ttl := 10 * time.Second + + testPods := []*api.Pod{ + makeBasePod(nodeName, "test-1", "100m", "500", []api.ContainerPort{{HostPort: 80}}), + makeBasePod(nodeName, "test-2", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}), + } + tests := []struct { + podsToAssume []*api.Pod + podsToAdd []*api.Pod + + wNodeInfo *NodeInfo + }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. + podsToAssume: []*api.Pod{testPods[0], testPods[1]}, + podsToAdd: []*api.Pod{testPods[0]}, + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + nonzeroRequest: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + pods: []*api.Pod{testPods[0]}, + }, + }} + + for i, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + cache.cleanupAssumedPods(now.Add(2 * ttl)) + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo) { + t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) + } + } +} + +// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired. +func TestAddPodAfterExpiration(t *testing.T) { + nodeName := "node" + ttl := 10 * time.Second + basePod := makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}) + tests := []struct { + pod *api.Pod + + wNodeInfo *NodeInfo + }{{ + pod: basePod, + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + nonzeroRequest: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + pods: []*api.Pod{basePod}, + }, + }} + + now := time.Now() + for i, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + if err := cache.assumePodIfBindSucceed(tt.pod, alwaysTrue, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + cache.cleanupAssumedPods(now.Add(2 * ttl)) + // It should be expired and removed. + n := cache.nodes[nodeName] + if n != nil { + t.Errorf("#%d: expecting nil node info, but get=%v", i, n) + } + if err := cache.AddPod(tt.pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + // check after expiration. confirmed pods shouldn't be expired. + n = cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo) { + t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) + } + } +} + +// TestUpdatePod tests that a pod will be updated if added before. +func TestUpdatePod(t *testing.T) { + nodeName := "node" + ttl := 10 * time.Second + testPods := []*api.Pod{ + makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}), + makeBasePod(nodeName, "test", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}), + } + tests := []struct { + podsToAssume []*api.Pod + podsToAdd []*api.Pod + podsToUpdate []*api.Pod + + wNodeInfo []*NodeInfo + }{{ // Pod is assumed and added. Then it would be updated twice. + podsToAssume: []*api.Pod{testPods[0]}, + podsToAdd: []*api.Pod{testPods[0]}, + podsToUpdate: []*api.Pod{testPods[0], testPods[1], testPods[0]}, + wNodeInfo: []*NodeInfo{{ + requestedResource: &Resource{ + MilliCPU: 200, + Memory: 1024, + }, + nonzeroRequest: &Resource{ + MilliCPU: 200, + Memory: 1024, + }, + pods: []*api.Pod{testPods[1]}, + }, { + requestedResource: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + nonzeroRequest: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + pods: []*api.Pod{testPods[0]}, + }}, + }} + + now := time.Now() + for _, tt := range tests { + cache := newSchedulerCache(ttl, time.Second, nil) + for _, podToAssume := range tt.podsToAssume { + if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + } + for _, podToAdd := range tt.podsToAdd { + if err := cache.AddPod(podToAdd); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + } + + for i := range tt.podsToUpdate { + if i == 0 { + continue + } + if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil { + t.Fatalf("UpdatePod failed: %v", err) + } + // check after expiration. confirmed pods shouldn't be expired. + n := cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) { + t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo) + } + } + } +} + +// TestRemovePod tests after added pod is removed, its information should also be subtracted. +func TestRemovePod(t *testing.T) { + nodeName := "node" + basePod := makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}) + tests := []struct { + pod *api.Pod + + wNodeInfo *NodeInfo + }{{ + pod: basePod, + wNodeInfo: &NodeInfo{ + requestedResource: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + nonzeroRequest: &Resource{ + MilliCPU: 100, + Memory: 500, + }, + pods: []*api.Pod{basePod}, + }, + }} + + for i, tt := range tests { + cache := newSchedulerCache(time.Second, time.Second, nil) + if err := cache.AssumePodIfBindSucceed(tt.pod, alwaysTrue); err != nil { + t.Fatalf("assumePod failed: %v", err) + } + if err := cache.AddPod(tt.pod); err != nil { + t.Fatalf("AddPod failed: %v", err) + } + n := cache.nodes[nodeName] + if !reflect.DeepEqual(n, tt.wNodeInfo) { + t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) + } + + if err := cache.RemovePod(tt.pod); err != nil { + t.Fatalf("RemovePod failed: %v", err) + } + + n = cache.nodes[nodeName] + if n != nil { + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + } + } +} + +func BenchmarkGetNodeNameToInfoMap1kNodes30kPods(b *testing.B) { + cache := setupCacheOf1kNodes30kPods(b) + b.ResetTimer() + for n := 0; n < b.N; n++ { + cache.GetNodeNameToInfoMap() + } +} + +func BenchmarkList1kNodes30kPods(b *testing.B) { + cache := setupCacheOf1kNodes30kPods(b) + b.ResetTimer() + for n := 0; n < b.N; n++ { + cache.List(labels.Everything()) + } +} + +func BenchmarkExpire100Pods(b *testing.B) { + benchmarkExpire(b, 100) +} + +func BenchmarkExpire1kPods(b *testing.B) { + benchmarkExpire(b, 1000) +} + +func BenchmarkExpire10kPods(b *testing.B) { + benchmarkExpire(b, 10000) +} + +func benchmarkExpire(b *testing.B, podNum int) { + now := time.Now() + for n := 0; n < b.N; n++ { + b.StopTimer() + cache := setupCacheWithAssumedPods(b, podNum, now) + b.StartTimer() + cache.cleanupAssumedPods(now.Add(2 * time.Second)) + } +} + +func makeBasePod(nodeName, objName, cpu, mem string, ports []api.ContainerPort) *api.Pod { + req := api.ResourceList{} + if cpu != "" { + req = api.ResourceList{ + api.ResourceCPU: resource.MustParse(cpu), + api.ResourceMemory: resource.MustParse(mem), + } + } + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "node_info_cache_test", + Name: objName, + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Resources: api.ResourceRequirements{ + Requests: req, + }, + Ports: ports, + }}, + NodeName: nodeName, + }, + } +} + +func setupCacheOf1kNodes30kPods(b *testing.B) Cache { + cache := newSchedulerCache(time.Second, time.Second, nil) + for i := 0; i < 1000; i++ { + nodeName := fmt.Sprintf("node-%d", i) + for j := 0; j < 30; j++ { + objName := fmt.Sprintf("%s-pod-%d", nodeName, j) + pod := makeBasePod(nodeName, objName, "0", "0", nil) + + err := cache.AssumePodIfBindSucceed(pod, alwaysTrue) + if err != nil { + b.Fatalf("AssumePodIfBindSucceed failed: %v", err) + } + err = cache.AddPod(pod) + if err != nil { + b.Fatalf("AddPod failed: %v", err) + } + } + } + return cache +} + +func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache { + cache := newSchedulerCache(time.Second, time.Second, nil) + for i := 0; i < podNum; i++ { + nodeName := fmt.Sprintf("node-%d", i/10) + objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) + pod := makeBasePod(nodeName, objName, "0", "0", nil) + + err := cache.assumePodIfBindSucceed(pod, alwaysTrue, assumedTime) + if err != nil { + b.Fatalf("assumePodIfBindSucceed failed: %v", err) + } + } + return cache +} + +func alwaysTrue() bool { + return true +} diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go new file mode 100644 index 00000000000..c7194f09dc2 --- /dev/null +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -0,0 +1,81 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" +) + +// Cache collects pods' information and provides node-level aggregated information. +// It's intended for generic scheduler to do efficient lookup. +// Cache's operations are pod centric. It incrementally updates itself based on pod events. +// Pod events are sent via network. We don't have guaranteed delivery of all events: +// We use Reflector to list and watch from remote. +// Reflector might be slow and do a relist, which would lead to missing events. +// +// State Machine of a pod's events in scheduler's cache: +// +// +-------+ +// | | +// | | Update +// Assume Add + | +// Initial +--------> Assumed +------------+---> Added <--+ +// + | + +// | | | +// | Add | | Remove +// | | | +// | + | +// +-------------> Expired +----> Deleted +// Expire +// +// Note that an assumed pod can expire, because if we haven't received Add event notifying us +// for a while, there might be some problems and we shouldn't keep the pod in cache anymore. +// +// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache. +// Based on existing use cases, we are making the following assumptions: +// - No pod would be assumed twice +// - If a pod wasn't added, it wouldn't be removed or updated. +// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, +// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. +type Cache interface { + // AssumePodIfBindSucceed assumes a pod to be scheduled if binding the pod succeeded. + // If binding return true, the pod's information is aggregated into designated node. + // Note that both binding and assuming are done as one atomic operation from cache's view. + // No other events like Add would happen in between binding and assuming. + // We are passing the binding function and let implementation take care of concurrency control details. + // The implementation also decides the policy to expire pod before being confirmed (receiving Add event). + // After expiration, its information would be subtracted. + AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error + + // AddPod either confirms a pod if it's assumed, or adds it back if it's expired. + // If added back, the pod's information would be added again. + AddPod(pod *api.Pod) error + + // UpdatePod removes oldPod's information and adds newPod's information. + UpdatePod(oldPod, newPod *api.Pod) error + + // RemovePod removes a pod. The pod's information would be subtracted from assigned node. + RemovePod(pod *api.Pod) error + + // GetNodeNameToInfoMap returns a map of node names to node info. The node info contains + // aggregated information of pods scheduled (including assumed to be) on this node. + GetNodeNameToInfoMap() (map[string]*NodeInfo, error) + + // List lists all cached pods (including assumed ones). + List(labels.Selector) ([]*api.Pod, error) +} diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 746386e0b2d..7f40e556f1c 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -19,7 +19,11 @@ package schedulercache import ( "fmt" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + clientcache "k8s.io/kubernetes/pkg/client/cache" + priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" ) var emptyResource = Resource{} @@ -31,6 +35,7 @@ type NodeInfo struct { // didn't get it as scheduled yet. requestedResource *Resource pods []*api.Pod + nonzeroRequest *Resource } // Resource is a collection of compute resource. @@ -45,6 +50,7 @@ type Resource struct { func NewNodeInfo(pods ...*api.Pod) *NodeInfo { ni := &NodeInfo{ requestedResource: &Resource{}, + nonzeroRequest: &Resource{}, } for _, pod := range pods { ni.addPod(pod) @@ -68,29 +74,86 @@ func (n *NodeInfo) RequestedResource() Resource { return *n.requestedResource } +// NonZeroRequest returns aggregated nonzero resource request of pods on this node. +func (n *NodeInfo) NonZeroRequest() Resource { + if n == nil { + return emptyResource + } + return *n.nonzeroRequest +} + +func (n *NodeInfo) Clone() *NodeInfo { + pods := append([]*api.Pod(nil), n.pods...) + clone := &NodeInfo{ + requestedResource: &(*n.requestedResource), + nonzeroRequest: &(*n.nonzeroRequest), + pods: pods, + } + return clone +} + // String returns representation of human readable format of this NodeInfo. func (n *NodeInfo) String() string { podKeys := make([]string, len(n.pods)) for i, pod := range n.pods { podKeys[i] = pod.Name } - return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v}", podKeys, n.requestedResource) + return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v}", podKeys, n.requestedResource, n.nonzeroRequest) } // addPod adds pod information to this NodeInfo. func (n *NodeInfo) addPod(pod *api.Pod) { - cpu, mem := calculateResource(pod) + cpu, mem, non0_cpu, non0_mem := calculateResource(pod) n.requestedResource.MilliCPU += cpu n.requestedResource.Memory += mem + n.nonzeroRequest.MilliCPU += non0_cpu + n.nonzeroRequest.Memory += non0_mem n.pods = append(n.pods, pod) } -func calculateResource(pod *api.Pod) (int64, int64) { - var cpu, mem int64 +// removePod subtracts pod information to this NodeInfo. +func (n *NodeInfo) removePod(pod *api.Pod) error { + k1, err := getPodKey(pod) + if err != nil { + return err + } + + cpu, mem, non0_cpu, non0_mem := calculateResource(pod) + n.requestedResource.MilliCPU -= cpu + n.requestedResource.Memory -= mem + n.nonzeroRequest.MilliCPU -= non0_cpu + n.nonzeroRequest.Memory -= non0_mem + + for i := range n.pods { + k2, err := getPodKey(n.pods[i]) + if err != nil { + glog.Errorf("Cannot get pod key, err: %v", err) + continue + } + if k1 == k2 { + // delete the element + n.pods[i] = n.pods[len(n.pods)-1] + n.pods = n.pods[:len(n.pods)-1] + return nil + } + } + return fmt.Errorf("no corresponding pod in pods") +} + +func calculateResource(pod *api.Pod) (cpu int64, mem int64, non0_cpu int64, non0_mem int64) { for _, c := range pod.Spec.Containers { req := c.Resources.Requests cpu += req.Cpu().MilliValue() mem += req.Memory().Value() + + non0_cpu_req, non0_mem_req := priorityutil.GetNonzeroRequests(&req) + non0_cpu += non0_cpu_req + non0_mem += non0_mem_req } - return cpu, mem + return +} + +// getPodKey returns the string key of a pod. +func getPodKey(pod *api.Pod) (string, error) { + return clientcache.MetaNamespaceKeyFunc(pod) }