diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3bf170bb3b4..9c85fa2f2dd 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -692,7 +692,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy) - klet.updatePodCIDR(kubeCfg.PodCIDR) + if err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil { + glog.Errorf("Pod CIDR update failed %v", err) + } // setup containerGC containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady) @@ -1011,6 +1013,18 @@ type Kubelet struct { // as it takes time to gather all necessary node information. nodeStatusUpdateFrequency time.Duration + // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. + // This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else. + syncNodeStatusMux sync.Mutex + + // updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe. + // This lock is used by Kublet.syncNodeStatus function and shouldn't be used anywhere else. + updatePodCIDRMux sync.Mutex + + // updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe. + // This lock is used by Kublet.updateRuntimeUp function and shouldn't be used anywhere else. + updateRuntimeMux sync.Mutex + // Generates pod events. pleg pleg.PodLifecycleEventGenerator @@ -1347,6 +1361,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) + go kl.fastStatusUpdateOnce() } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) @@ -2079,6 +2094,9 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time { // and returns an error if the status check fails. If the status check is OK, // update the container runtime uptime in the kubelet runtimeState. func (kl *Kubelet) updateRuntimeUp() { + kl.updateRuntimeMux.Lock() + defer kl.updateRuntimeMux.Unlock() + s, err := kl.containerRuntime.Status() if err != nil { glog.Errorf("Container runtime sanity check failed: %v", err) @@ -2153,6 +2171,31 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str } } +// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR +// is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off +// a runtime update and a node status update. Function returns after one successful node status update. +// Function is executed only during Kubelet start which improves latency to ready node by updating +// pod CIDR, runtime status and node statuses ASAP. +func (kl *Kubelet) fastStatusUpdateOnce() { + for { + time.Sleep(100 * time.Millisecond) + node, err := kl.GetNode() + if err != nil { + glog.Errorf(err.Error()) + continue + } + if node.Spec.PodCIDR != "" { + if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { + glog.Errorf("Pod CIDR update failed %v", err) + continue + } + kl.updateRuntimeUp() + kl.syncNodeStatus() + return + } + } +} + // isSyncPodWorthy filters out events that are not worthy of pod syncing func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { // ContatnerRemoved doesn't affect pod state diff --git a/pkg/kubelet/kubelet_network.go b/pkg/kubelet/kubelet_network.go index fd7a84e4846..a07ece7bb5c 100644 --- a/pkg/kubelet/kubelet_network.go +++ b/pkg/kubelet/kubelet_network.go @@ -56,22 +56,25 @@ func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool { // updatePodCIDR updates the pod CIDR in the runtime state if it is different // from the current CIDR. -func (kl *Kubelet) updatePodCIDR(cidr string) { +func (kl *Kubelet) updatePodCIDR(cidr string) error { + kl.updatePodCIDRMux.Lock() + defer kl.updatePodCIDRMux.Unlock() + podCIDR := kl.runtimeState.podCIDR() if podCIDR == cidr { - return + return nil } // kubelet -> generic runtime -> runtime shim -> network plugin // docker/non-cri implementations have a passthrough UpdatePodCIDR if err := kl.getRuntime().UpdatePodCIDR(cidr); err != nil { - glog.Errorf("Failed to update pod CIDR: %v", err) - return + return fmt.Errorf("failed to update pod CIDR: %v", err) } glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr) kl.runtimeState.setPodCIDR(cidr) + return nil } // syncNetworkUtil ensures the network utility are present on host. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index bb2117fe579..4c54d012208 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -351,6 +351,9 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { // It synchronizes node status to master, registering the kubelet first if // necessary. func (kl *Kubelet) syncNodeStatus() { + kl.syncNodeStatusMux.Lock() + defer kl.syncNodeStatusMux.Unlock() + if kl.kubeClient == nil || kl.heartbeatClient == nil { return } @@ -402,7 +405,9 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { } if node.Spec.PodCIDR != "" { - kl.updatePodCIDR(node.Spec.PodCIDR) + if err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { + glog.Errorf(err.Error()) + } } kl.setNodeStatus(node)