mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #99336 from neolit123/1.21-kublet-node-sync-fix
pkg/kubelet: improve the node informer sync check
This commit is contained in:
commit
232d93037c
@ -17,6 +17,8 @@ limitations under the License.
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
@ -24,13 +26,32 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WaitForAPIServerSyncPeriod is the period between checks for the node list/watch initial sync
|
||||||
|
const WaitForAPIServerSyncPeriod = 1 * time.Second
|
||||||
|
|
||||||
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
|
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
|
||||||
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
|
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
|
||||||
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
|
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
|
||||||
newSourceApiserverFromLW(lw, updates)
|
|
||||||
|
// The Reflector responsible for watching pods at the apiserver should be run only after
|
||||||
|
// the node sync with the apiserver has completed.
|
||||||
|
klog.InfoS("Waiting for node sync before watching apiserver pods")
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
if nodeHasSynced() {
|
||||||
|
klog.V(4).InfoS("node sync completed")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(WaitForAPIServerSyncPeriod)
|
||||||
|
klog.V(4).InfoS("node sync has not completed yet")
|
||||||
|
}
|
||||||
|
klog.InfoS("Watching apiserver")
|
||||||
|
newSourceApiserverFromLW(lw, updates)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
|
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
|
||||||
|
@ -124,9 +124,6 @@ const (
|
|||||||
// Max amount of time to wait for the container runtime to come up.
|
// Max amount of time to wait for the container runtime to come up.
|
||||||
maxWaitForContainerRuntime = 30 * time.Second
|
maxWaitForContainerRuntime = 30 * time.Second
|
||||||
|
|
||||||
// Max amount of time to wait for node list/watch to initially sync
|
|
||||||
maxWaitForAPIServerSync = 10 * time.Second
|
|
||||||
|
|
||||||
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
|
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
|
||||||
nodeStatusUpdateRetry = 5
|
nodeStatusUpdateRetry = 5
|
||||||
|
|
||||||
@ -257,7 +254,7 @@ type DockerOptions struct {
|
|||||||
|
|
||||||
// makePodSourceConfig creates a config.PodConfig from the given
|
// makePodSourceConfig creates a config.PodConfig from the given
|
||||||
// KubeletConfiguration or returns an error.
|
// KubeletConfiguration or returns an error.
|
||||||
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
|
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
|
||||||
manifestURLHeader := make(http.Header)
|
manifestURLHeader := make(http.Header)
|
||||||
if len(kubeCfg.StaticPodURLHeader) > 0 {
|
if len(kubeCfg.StaticPodURLHeader) > 0 {
|
||||||
for k, v := range kubeCfg.StaticPodURLHeader {
|
for k, v := range kubeCfg.StaticPodURLHeader {
|
||||||
@ -283,8 +280,8 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
|
|||||||
}
|
}
|
||||||
|
|
||||||
if kubeDeps.KubeClient != nil {
|
if kubeDeps.KubeClient != nil {
|
||||||
klog.InfoS("Watching apiserver")
|
klog.InfoS("Adding apiserver pod source")
|
||||||
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
|
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
|
||||||
}
|
}
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
@ -390,9 +387,32 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nodeHasSynced cache.InformerSynced
|
||||||
|
var nodeLister corelisters.NodeLister
|
||||||
|
|
||||||
|
// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
|
||||||
|
// If not nil, we are running as part of a cluster and should sync w/API
|
||||||
|
if kubeDeps.KubeClient != nil {
|
||||||
|
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||||
|
options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
|
||||||
|
}))
|
||||||
|
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
|
||||||
|
nodeHasSynced = func() bool {
|
||||||
|
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
|
||||||
|
}
|
||||||
|
kubeInformers.Start(wait.NeverStop)
|
||||||
|
klog.InfoS("Attempting to sync node with API server")
|
||||||
|
} else {
|
||||||
|
// we don't have a client to sync!
|
||||||
|
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||||
|
nodeLister = corelisters.NewNodeLister(nodeIndexer)
|
||||||
|
nodeHasSynced = func() bool { return true }
|
||||||
|
klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
|
||||||
|
}
|
||||||
|
|
||||||
if kubeDeps.PodConfig == nil {
|
if kubeDeps.PodConfig == nil {
|
||||||
var err error
|
var err error
|
||||||
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
|
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -433,8 +453,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
|
|
||||||
var serviceLister corelisters.ServiceLister
|
var serviceLister corelisters.ServiceLister
|
||||||
var serviceHasSynced cache.InformerSynced
|
var serviceHasSynced cache.InformerSynced
|
||||||
// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
|
|
||||||
// If not nil, we are running as part of a cluster and should sync w/API
|
|
||||||
if kubeDeps.KubeClient != nil {
|
if kubeDeps.KubeClient != nil {
|
||||||
kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0)
|
kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0)
|
||||||
serviceLister = kubeInformers.Core().V1().Services().Lister()
|
serviceLister = kubeInformers.Core().V1().Services().Lister()
|
||||||
@ -446,31 +464,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
serviceHasSynced = func() bool { return true }
|
serviceHasSynced = func() bool { return true }
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodeHasSynced cache.InformerSynced
|
|
||||||
var nodeLister corelisters.NodeLister
|
|
||||||
|
|
||||||
if kubeDeps.KubeClient != nil {
|
|
||||||
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
|
||||||
options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
|
|
||||||
}))
|
|
||||||
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
|
|
||||||
nodeHasSynced = func() bool {
|
|
||||||
if kubeInformers.Core().V1().Nodes().Informer().HasSynced() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
klog.InfoS("Kubelet nodes not sync")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
kubeInformers.Start(wait.NeverStop)
|
|
||||||
klog.InfoS("Kubelet client is not nil")
|
|
||||||
} else {
|
|
||||||
// we don't have a client to sync!
|
|
||||||
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
|
||||||
nodeLister = corelisters.NewNodeLister(nodeIndexer)
|
|
||||||
nodeHasSynced = func() bool { return true }
|
|
||||||
klog.InfoS("Kubelet client is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// construct a node reference used for events
|
// construct a node reference used for events
|
||||||
nodeRef := &v1.ObjectReference{
|
nodeRef := &v1.ObjectReference{
|
||||||
Kind: "Node",
|
Kind: "Node",
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
|
||||||
|
|
||||||
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
|
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
|
||||||
cadvisorv2 "github.com/google/cadvisor/info/v2"
|
cadvisorv2 "github.com/google/cadvisor/info/v2"
|
||||||
@ -33,7 +32,6 @@ import (
|
|||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
@ -237,15 +235,6 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
|
|||||||
if kl.kubeClient == nil {
|
if kl.kubeClient == nil {
|
||||||
return kl.initialNode(context.TODO())
|
return kl.initialNode(context.TODO())
|
||||||
}
|
}
|
||||||
// if we have a valid kube client, we wait for initial lister to sync
|
|
||||||
if !kl.nodeHasSynced() {
|
|
||||||
err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) {
|
|
||||||
return kl.nodeHasSynced(), nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return kl.nodeLister.Get(string(kl.nodeName))
|
return kl.nodeLister.Get(string(kl.nodeName))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -256,7 +245,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
|
|||||||
// zero capacity, and the default labels.
|
// zero capacity, and the default labels.
|
||||||
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
|
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
|
||||||
if kl.kubeClient != nil {
|
if kl.kubeClient != nil {
|
||||||
if n, err := kl.GetNode(); err == nil {
|
if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user