diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 997e66d6607..0a164935fee 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -138,23 +138,6 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) { return } -// TODO Move this back to scheduler as a helper function that takes a Store, -// rather than a method of StoreToNodeLister. -// GetNodeInfo returns cached data for the node 'id'. -func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) { - node, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}}) - - if err != nil { - return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err) - } - - if !exists { - return nil, fmt.Errorf("node '%v' is not in cache", id) - } - - return node.(*api.Node), nil -} - // StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. type StoreToReplicationControllerLister struct { Store diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 09e2bb7a89f..5dab21756c2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -263,6 +263,7 @@ func NewMainKubelet( cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() } nodeLister := &cache.StoreToNodeLister{Store: nodeStore} + nodeInfo := &predicates.CachedNodeInfo{nodeLister} // TODO: get the real node object of ourself, // and use the real node name and UID. @@ -301,6 +302,7 @@ func NewMainKubelet( clusterDNS: clusterDNS, serviceLister: serviceLister, nodeLister: nodeLister, + nodeInfo: nodeInfo, masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, recorder: recorder, @@ -473,7 +475,6 @@ type serviceLister interface { type nodeLister interface { List() (machines api.NodeList, err error) - GetNodeInfo(id string) (*api.Node, error) } // Kubelet is the main kubelet implementation. @@ -527,6 +528,7 @@ type Kubelet struct { masterServiceNamespace string serviceLister serviceLister nodeLister nodeLister + nodeInfo predicates.NodeInfo // a list of node labels to register nodeLabels []string @@ -822,7 +824,7 @@ func (kl *Kubelet) GetNode() (*api.Node, error) { if kl.standaloneMode { return nil, errors.New("no node entry for kubelet in standalone mode") } - return kl.nodeLister.GetNodeInfo(kl.nodeName) + return kl.nodeInfo.GetNodeInfo(kl.nodeName) } // Starts garbage collection threads. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 37d0165394c..38114a578b7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -112,6 +112,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.nodeLister = testNodeLister{} + kubelet.nodeInfo = testNodeInfo{} kubelet.recorder = fakeRecorder if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) @@ -1045,7 +1046,11 @@ type testNodeLister struct { nodes []api.Node } -func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { +type testNodeInfo struct { + nodes []api.Node +} + +func (ls testNodeInfo) GetNodeInfo(id string) (*api.Node, error) { for _, node := range ls.nodes { if node.Name == id { return &node, nil @@ -2319,6 +2324,9 @@ func TestHandleNodeSelector(t *testing.T) { kl.nodeLister = testNodeLister{nodes: []api.Node{ {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}, }} + kl.nodeInfo = testNodeInfo{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}, + }} testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 508acd66d31..4f6a16455ec 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -49,6 +49,7 @@ func TestRunOnce(t *testing.T) { recorder: &record.FakeRecorder{}, cadvisor: cadvisor, nodeLister: testNodeLister{}, + nodeInfo: testNodeInfo{}, statusManager: status.NewManager(nil, podManager), containerRefManager: kubecontainer.NewRefManager(), podManager: podManager, diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 44747176be5..f93ea54a52f 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -20,6 +20,7 @@ import ( "fmt" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -52,6 +53,25 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) { return nodes.Nodes().Get(nodeID) } +type CachedNodeInfo struct { + *cache.StoreToNodeLister +} + +// GetNodeInfo returns cached data for the node 'id'. +func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) { + node, exists, err := c.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}}) + + if err != nil { + return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err) + } + + if !exists { + return nil, fmt.Errorf("node '%v' is not in cache", id) + } + + return node.(*api.Node), nil +} + func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { if volume.GCEPersistentDisk != nil { disk := volume.GCEPersistentDisk diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index d3452726848..dd17f530640 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/api/validation" @@ -176,7 +177,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, ControllerLister: f.ControllerLister, // All fit predicates only need to consider schedulable nodes. NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), - NodeInfo: f.NodeLister, + NodeInfo: &predicates.CachedNodeInfo{f.NodeLister}, } predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs) if err != nil {