mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #94087 from derekwaynecarr/node-sync-once
kubelet waits for node lister to sync at least once
This commit is contained in:
commit
4e93dbcd0d
@ -124,6 +124,9 @@ const (
|
|||||||
// Max amount of time to wait for the container runtime to come up.
|
// Max amount of time to wait for the container runtime to come up.
|
||||||
maxWaitForContainerRuntime = 30 * time.Second
|
maxWaitForContainerRuntime = 30 * time.Second
|
||||||
|
|
||||||
|
// Max amount of time to wait for node list/watch to initially sync
|
||||||
|
maxWaitForAPIServerSync = 10 * time.Second
|
||||||
|
|
||||||
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
|
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
|
||||||
nodeStatusUpdateRetry = 5
|
nodeStatusUpdateRetry = 5
|
||||||
|
|
||||||
@ -431,14 +434,31 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
serviceHasSynced = func() bool { return true }
|
serviceHasSynced = func() bool { return true }
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
var nodeHasSynced cache.InformerSynced
|
||||||
|
var nodeLister corelisters.NodeLister
|
||||||
|
|
||||||
if kubeDeps.KubeClient != nil {
|
if kubeDeps.KubeClient != nil {
|
||||||
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
|
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||||
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
|
options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String()
|
||||||
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
|
}))
|
||||||
go r.Run(wait.NeverStop)
|
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
|
||||||
|
nodeHasSynced = func() bool {
|
||||||
|
if kubeInformers.Core().V1().Nodes().Informer().HasSynced() {
|
||||||
|
klog.Infof("kubelet nodes sync")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
klog.Infof("kubelet nodes not sync")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
kubeInformers.Start(wait.NeverStop)
|
||||||
|
klog.Info("Kubelet client is not nil")
|
||||||
|
} else {
|
||||||
|
// we dont have a client to sync!
|
||||||
|
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||||
|
nodeLister = corelisters.NewNodeLister(nodeIndexer)
|
||||||
|
nodeHasSynced = func() bool { return true }
|
||||||
|
klog.Info("Kubelet client is nil")
|
||||||
}
|
}
|
||||||
nodeLister := corelisters.NewNodeLister(nodeIndexer)
|
|
||||||
|
|
||||||
// construct a node reference used for events
|
// construct a node reference used for events
|
||||||
nodeRef := &v1.ObjectReference{
|
nodeRef := &v1.ObjectReference{
|
||||||
@ -481,6 +501,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
serviceLister: serviceLister,
|
serviceLister: serviceLister,
|
||||||
serviceHasSynced: serviceHasSynced,
|
serviceHasSynced: serviceHasSynced,
|
||||||
nodeLister: nodeLister,
|
nodeLister: nodeLister,
|
||||||
|
nodeHasSynced: nodeHasSynced,
|
||||||
masterServiceNamespace: masterServiceNamespace,
|
masterServiceNamespace: masterServiceNamespace,
|
||||||
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
|
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
|
||||||
recorder: kubeDeps.Recorder,
|
recorder: kubeDeps.Recorder,
|
||||||
@ -885,7 +906,9 @@ type Kubelet struct {
|
|||||||
serviceHasSynced cache.InformerSynced
|
serviceHasSynced cache.InformerSynced
|
||||||
// nodeLister knows how to list nodes
|
// nodeLister knows how to list nodes
|
||||||
nodeLister corelisters.NodeLister
|
nodeLister corelisters.NodeLister
|
||||||
|
// nodeHasSynced indicates whether nodes have been sync'd at least once.
|
||||||
|
// Check this before trusting a response from the node lister.
|
||||||
|
nodeHasSynced cache.InformerSynced
|
||||||
// a list of node labels to register
|
// a list of node labels to register
|
||||||
nodeLabels map[string]string
|
nodeLabels map[string]string
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
|
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
|
||||||
cadvisorv2 "github.com/google/cadvisor/info/v2"
|
cadvisorv2 "github.com/google/cadvisor/info/v2"
|
||||||
@ -32,6 +33,7 @@ import (
|
|||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
@ -235,6 +237,15 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
|
|||||||
if kl.kubeClient == nil {
|
if kl.kubeClient == nil {
|
||||||
return kl.initialNode(context.TODO())
|
return kl.initialNode(context.TODO())
|
||||||
}
|
}
|
||||||
|
// if we have a valid kube client, we wait for initial lister to sync
|
||||||
|
if !kl.nodeHasSynced() {
|
||||||
|
err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) {
|
||||||
|
return kl.nodeHasSynced(), nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object")
|
||||||
|
}
|
||||||
|
}
|
||||||
return kl.nodeLister.Get(string(kl.nodeName))
|
return kl.nodeLister.Get(string(kl.nodeName))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,7 +256,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
|
|||||||
// zero capacity, and the default labels.
|
// zero capacity, and the default labels.
|
||||||
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
|
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
|
||||||
if kl.kubeClient != nil {
|
if kl.kubeClient != nil {
|
||||||
if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil {
|
if n, err := kl.GetNode(); err == nil {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -182,6 +182,7 @@ func newTestKubeletWithImageList(
|
|||||||
kubelet.masterServiceNamespace = metav1.NamespaceDefault
|
kubelet.masterServiceNamespace = metav1.NamespaceDefault
|
||||||
kubelet.serviceLister = testServiceLister{}
|
kubelet.serviceLister = testServiceLister{}
|
||||||
kubelet.serviceHasSynced = func() bool { return true }
|
kubelet.serviceHasSynced = func() bool { return true }
|
||||||
|
kubelet.nodeHasSynced = func() bool { return true }
|
||||||
kubelet.nodeLister = testNodeLister{
|
kubelet.nodeLister = testNodeLister{
|
||||||
nodes: []*v1.Node{
|
nodes: []*v1.Node{
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user