diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ae695aabd6a..8fdc7fbe09b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -124,6 +124,9 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 30 * time.Second + // Max amount of time to wait for node list/watch to initially sync + maxWaitForAPIServerSync = 10 * time.Second + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 @@ -431,14 +434,31 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, serviceHasSynced = func() bool { return true } } - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + var nodeHasSynced cache.InformerSynced + var nodeLister corelisters.NodeLister + if kubeDeps.KubeClient != nil { - fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector() - nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector) - r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0) - go r.Run(wait.NeverStop) + kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String() + })) + nodeLister = kubeInformers.Core().V1().Nodes().Lister() + nodeHasSynced = func() bool { + if kubeInformers.Core().V1().Nodes().Informer().HasSynced() { + klog.Infof("kubelet nodes sync") + return true + } + klog.Infof("kubelet nodes not sync") + return false + } + kubeInformers.Start(wait.NeverStop) + klog.Info("Kubelet client is not nil") + } else { + // we dont have a client to sync! + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + nodeLister = corelisters.NewNodeLister(nodeIndexer) + nodeHasSynced = func() bool { return true } + klog.Info("Kubelet client is nil") } - nodeLister := corelisters.NewNodeLister(nodeIndexer) // construct a node reference used for events nodeRef := &v1.ObjectReference{ @@ -481,6 +501,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, serviceLister: serviceLister, serviceHasSynced: serviceHasSynced, nodeLister: nodeLister, + nodeHasSynced: nodeHasSynced, masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, recorder: kubeDeps.Recorder, @@ -885,7 +906,9 @@ type Kubelet struct { serviceHasSynced cache.InformerSynced // nodeLister knows how to list nodes nodeLister corelisters.NodeLister - + // nodeHasSynced indicates whether nodes have been sync'd at least once. + // Check this before trusting a response from the node lister. + nodeHasSynced cache.InformerSynced // a list of node labels to register nodeLabels map[string]string diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index f786a6921b2..e8e2146d744 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net" "path/filepath" + "time" cadvisorapiv1 "github.com/google/cadvisor/info/v1" cadvisorv2 "github.com/google/cadvisor/info/v2" @@ -32,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -235,6 +237,15 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { if kl.kubeClient == nil { return kl.initialNode(context.TODO()) } + // if we have a valid kube client, we wait for initial lister to sync + if !kl.nodeHasSynced() { + err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) { + return kl.nodeHasSynced(), nil + }) + if err != nil { + return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object") + } + } return kl.nodeLister.Get(string(kl.nodeName)) } @@ -245,7 +256,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { // zero capacity, and the default labels. func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) { if kl.kubeClient != nil { - if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil { + if n, err := kl.GetNode(); err == nil { return n, nil } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 336af7ffeb1..dcb9ce9501c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -182,6 +182,7 @@ func newTestKubeletWithImageList( kubelet.masterServiceNamespace = metav1.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.serviceHasSynced = func() bool { return true } + kubelet.nodeHasSynced = func() bool { return true } kubelet.nodeLister = testNodeLister{ nodes: []*v1.Node{ {