From 2369cd64011ff4f16b115d34d5e815c8e676020d Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Wed, 16 Sep 2015 15:44:51 -0700 Subject: [PATCH] Move node information retreival to nodeManager nodeManager should handle most node object interaction with apiserver. This moves exsiting node-watching and GetNodes() methods to nodeManager. --- pkg/kubelet/kubelet.go | 42 ++----------------------- pkg/kubelet/node_manager.go | 61 ++++++++++++++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 01836c18b84..ece84197f53 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,7 +61,6 @@ import ( "k8s.io/kubernetes/pkg/util/bandwidth" utilErrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/mount" - nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/sets" @@ -198,23 +197,6 @@ func NewMainKubelet( } serviceLister := &cache.StoreToServiceLister{Store: serviceStore} - nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - if kubeClient != nil { - // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather - // than an interface. There is no way to construct a list+watcher using resource name. - fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector() - listWatch := &cache.ListWatch{ - ListFunc: func() (runtime.Object, error) { - return kubeClient.Nodes().List(labels.Everything(), fieldSelector) - }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) - }, - } - cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() - } - nodeLister := &cache.StoreToNodeLister{Store: nodeStore} - // TODO: get the real node object of ourself, // and use the real node name and UID. // TODO: what is namespace for node? @@ -260,7 +242,6 @@ func NewMainKubelet( clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, - nodeLister: nodeLister, runtimeMutex: sync.Mutex{}, runtimeUpThreshold: maxWaitForContainerRuntime, lastTimestampRuntimeUp: time.Time{}, @@ -402,11 +383,6 @@ type serviceLister interface { List() (api.ServiceList, error) } -type nodeLister interface { - List() (machines api.NodeList, err error) - GetNodeInfo(id string) (*api.Node, error) -} - // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string @@ -449,7 +425,6 @@ type Kubelet struct { masterServiceNamespace string serviceLister serviceLister - nodeLister nodeLister // Last timestamp when runtime responded on ping. // Mutex is used to protect this value. @@ -694,13 +669,6 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) { return pods, nil } -func (kl *Kubelet) GetNode() (*api.Node, error) { - if kl.standaloneMode { - return nil, errors.New("no node entry for kubelet in standalone mode") - } - return kl.nodeLister.GetNodeInfo(kl.nodeName) -} - // Starts garbage collection threads. func (kl *Kubelet) StartGarbageCollection() { go util.Until(func() { @@ -1682,7 +1650,7 @@ func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool { if kl.standaloneMode { return true } - node, err := kl.GetNode() + node, err := kl.nodeManager.GetNode() if err != nil { glog.Errorf("error getting node: %v", err) return true @@ -1981,11 +1949,7 @@ func (kl *Kubelet) GetHostname() string { // Returns host IP or nil in case of error. func (kl *Kubelet) GetHostIP() (net.IP, error) { - node, err := kl.GetNode() - if err != nil { - return nil, fmt.Errorf("cannot get node: %v", err) - } - return nodeutil.GetNodeHostIP(node) + return kl.nodeManager.GetHostIP() } // GetPods returns all pods bound to the kubelet and their spec, and the mirror @@ -2248,7 +2212,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...) if !kl.standaloneMode { - hostIP, err := kl.GetHostIP() + hostIP, err := kl.nodeManager.GetHostIP() if err != nil { glog.V(4).Infof("Cannot get host IP: %v", err) } else { diff --git a/pkg/kubelet/node_manager.go b/pkg/kubelet/node_manager.go index e6c506e398c..589a463fa0c 100644 --- a/pkg/kubelet/node_manager.go +++ b/pkg/kubelet/node_manager.go @@ -16,10 +16,8 @@ limitations under the License. package kubelet -// Note: if you change code in this file, you might need to change code in -// contrib/mesos/pkg/executor/. - import ( + "errors" "fmt" "net" "strings" @@ -31,11 +29,17 @@ import ( "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" + nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/watch" ) const ( @@ -52,13 +56,17 @@ type infoGetter interface { type nodeManager interface { Start() + GetNode() (*api.Node, error) GetPodCIDR() string + GetHostIP() (net.IP, error) } type realNodeManager struct { // apiserver client. client client.Interface + nodeLister nodeLister + // Set to true to have the node register itself with the apiserver. registerNode bool @@ -124,10 +132,17 @@ func newRealNodeManager(client client.Interface, cloud cloudprovider.Interface, } } +type nodeLister interface { + List() (machines api.NodeList, err error) + GetNodeInfo(id string) (*api.Node, error) +} + func (nm *realNodeManager) Start() { - if nm.client != nil { - go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop) + if nm.client == nil { + return } + nm.setNodeLister() + go util.Until(nm.syncNodeStatus, nm.nodeStatusUpdateFrequency, util.NeverStop) } func (nm *realNodeManager) GetPodCIDR() string { @@ -136,6 +151,42 @@ func (nm *realNodeManager) GetPodCIDR() string { return nm.podCIDR } +func (nm *realNodeManager) GetNode() (*api.Node, error) { + if nm.client == nil { + return nil, errors.New("unable to get node entry because apiserver client is nil") + } + return nm.nodeLister.GetNodeInfo(nm.nodeName) +} + +// Returns host IP or nil in case of error. +func (nm *realNodeManager) GetHostIP() (net.IP, error) { + if nm.client == nil { + return nil, errors.New("unable to get node entry because apiserver client is nil") + } + node, err := nm.GetNode() + if err != nil { + return nil, fmt.Errorf("cannot get node: %v", err) + } + return nodeutil.GetNodeHostIP(node) +} + +func (nm *realNodeManager) setNodeLister() { + nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather + // than an interface. There is no way to construct a list+watcher using resource name. + fieldSelector := fields.Set{client.ObjectNameField: nm.nodeName}.AsSelector() + listWatch := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return nm.client.Nodes().List(labels.Everything(), fieldSelector) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return nm.client.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion) + }, + } + cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() + nm.nodeLister = &cache.StoreToNodeLister{Store: nodeStore} +} + // syncNodeStatus should be called periodically from a goroutine. // It synchronizes node status to master, registering the kubelet first if // necessary.