From 752135242e49dc571159eb51328bcfc7734e6033 Mon Sep 17 00:00:00 2001 From: Derek Carr Date: Tue, 18 Aug 2020 15:24:03 -0400 Subject: [PATCH] WIP: node sync at least once --- pkg/kubelet/kubelet.go | 37 +++++++++++++++++++++++++++------- pkg/kubelet/kubelet_getters.go | 13 +++++++++++- pkg/kubelet/kubelet_test.go | 1 + 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b039ab963c5..c5d2ee6f0db 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -125,6 +125,9 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 30 * time.Second + // Max amount of time to wait for node list/watch to initially sync + maxWaitForAPIServerSync = 10 * time.Second + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 @@ -431,14 +434,31 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, serviceHasSynced = func() bool { return true } } - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + var nodeHasSynced cache.InformerSynced + var nodeLister corelisters.NodeLister + if kubeDeps.KubeClient != nil { - fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector() - nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector) - r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0) - go r.Run(wait.NeverStop) + kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String() + })) + nodeLister = kubeInformers.Core().V1().Nodes().Lister() + nodeHasSynced = func() bool { + if kubeInformers.Core().V1().Nodes().Informer().HasSynced() { + klog.Infof("kubelet nodes sync") + return true + } + klog.Infof("kubelet nodes not sync") + return false + } + kubeInformers.Start(wait.NeverStop) + klog.Info("Kubelet client is not nil") + } else { + // we dont have a client to sync! + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + nodeLister = corelisters.NewNodeLister(nodeIndexer) + nodeHasSynced = func() bool { return true } + klog.Info("Kubelet client is nil") } - nodeLister := corelisters.NewNodeLister(nodeIndexer) // TODO: get the real node object of ourself, // and use the real node name and UID. @@ -489,6 +509,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, serviceLister: serviceLister, serviceHasSynced: serviceHasSynced, nodeLister: nodeLister, + nodeHasSynced: nodeHasSynced, masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, recorder: kubeDeps.Recorder, @@ -873,7 +894,9 @@ type Kubelet struct { serviceHasSynced cache.InformerSynced // nodeLister knows how to list nodes nodeLister corelisters.NodeLister - + // nodeHasSynced indicates whether nodes have been sync'd at least once. + // Check this before trusting a response from the node lister. + nodeHasSynced cache.InformerSynced // a list of node labels to register nodeLabels map[string]string diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index aed08db4c2f..dbe5ff53f43 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net" "path/filepath" + "time" cadvisorapiv1 "github.com/google/cadvisor/info/v1" cadvisorv2 "github.com/google/cadvisor/info/v2" @@ -32,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -235,6 +237,15 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { if kl.kubeClient == nil { return kl.initialNode(context.TODO()) } + // if we have a valid kube client, we wait for initial lister to sync + if !kl.nodeHasSynced() { + err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) { + return kl.nodeHasSynced(), nil + }) + if err != nil { + return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object") + } + } return kl.nodeLister.Get(string(kl.nodeName)) } @@ -245,7 +256,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { // zero capacity, and the default labels. func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) { if kl.kubeClient != nil { - if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil { + if n, err := kl.GetNode(); err == nil { return n, nil } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 785515f7c8c..c0decccfa31 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -182,6 +182,7 @@ func newTestKubeletWithImageList( kubelet.masterServiceNamespace = metav1.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.serviceHasSynced = func() bool { return true } + kubelet.nodeHasSynced = func() bool { return true } kubelet.nodeLister = testNodeLister{ nodes: []*v1.Node{ {