Merge pull request #116706 from pacoxu/deflake-kubemark-data-race

kubelet: fix data races
This commit is contained in:
Kubernetes Prow Robot 2023-03-17 08:41:26 -07:00 committed by GitHub
commit aa0fea6944
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 19 additions and 8 deletions

View File

@ -20,6 +20,7 @@ package options
import ( import (
"fmt" "fmt"
_ "net/http/pprof" // Enable pprof HTTP handlers. _ "net/http/pprof" // Enable pprof HTTP handlers.
"path/filepath"
"strings" "strings"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -138,7 +139,7 @@ func NewKubeletFlags() *KubeletFlags {
return &KubeletFlags{ return &KubeletFlags{
ContainerRuntimeOptions: *NewContainerRuntimeOptions(), ContainerRuntimeOptions: *NewContainerRuntimeOptions(),
CertDirectory: "/var/lib/kubelet/pki", CertDirectory: "/var/lib/kubelet/pki",
RootDirectory: defaultRootDir, RootDirectory: filepath.Clean(defaultRootDir),
MaxContainerCount: -1, MaxContainerCount: -1,
MaxPerPodContainerCount: 1, MaxPerPodContainerCount: 1,
MinimumGCAge: metav1.Duration{Duration: 0}, MinimumGCAge: metav1.Duration{Duration: 0},

View File

@ -523,7 +523,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeClient: kubeDeps.KubeClient, kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient, heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure, onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: rootDirectory, rootDirectory: filepath.Clean(rootDirectory),
resyncInterval: kubeCfg.SyncFrequency.Duration, resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: registerNode, registerNode: registerNode,
@ -1321,7 +1321,9 @@ func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
// 4. the pod-resources directory // 4. the pod-resources directory
// 5. the checkpoint directory // 5. the checkpoint directory
func (kl *Kubelet) setupDataDirs() error { func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = filepath.Clean(kl.rootDirectory) if cleanedRoot := filepath.Clean(kl.rootDirectory); cleanedRoot != kl.rootDirectory {
return fmt.Errorf("rootDirectory not in canonical form: expected %s, was %s", cleanedRoot, kl.rootDirectory)
}
pluginRegistrationDir := kl.getPluginsRegistrationDir() pluginRegistrationDir := kl.getPluginsRegistrationDir()
pluginsDir := kl.getPluginsDir() pluginsDir := kl.getPluginsDir()
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil { if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {

View File

@ -19,6 +19,7 @@ package kubelet
import ( import (
"context" "context"
"os" "os"
"path/filepath"
"testing" "testing"
"time" "time"
@ -81,7 +82,7 @@ func TestRunOnce(t *testing.T) {
} }
defer os.RemoveAll(basePath) defer os.RemoveAll(basePath)
kb := &Kubelet{ kb := &Kubelet{
rootDirectory: basePath, rootDirectory: filepath.Clean(basePath),
recorder: &record.FakeRecorder{}, recorder: &record.FakeRecorder{},
cadvisor: cadvisor, cadvisor: cadvisor,
nodeLister: testNodeLister{}, nodeLister: testNodeLister{},

View File

@ -18,6 +18,7 @@ package reconciler
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -139,6 +140,8 @@ type reconciler struct {
volumePluginMgr *volumepkg.VolumePluginMgr volumePluginMgr *volumepkg.VolumePluginMgr
skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
kubeletPodsDir string kubeletPodsDir string
// lock protects timeOfLastSync for updating and checking
timeOfLastSyncLock sync.Mutex
timeOfLastSync time.Time timeOfLastSync time.Time
volumesFailedReconstruction []podVolume volumesFailedReconstruction []podVolume
volumesNeedDevicePath []v1.UniqueVolumeName volumesNeedDevicePath []v1.UniqueVolumeName

View File

@ -72,10 +72,14 @@ type globalVolumeInfo struct {
} }
func (rc *reconciler) updateLastSyncTime() { func (rc *reconciler) updateLastSyncTime() {
rc.timeOfLastSyncLock.Lock()
defer rc.timeOfLastSyncLock.Unlock()
rc.timeOfLastSync = time.Now() rc.timeOfLastSync = time.Now()
} }
func (rc *reconciler) StatesHasBeenSynced() bool { func (rc *reconciler) StatesHasBeenSynced() bool {
rc.timeOfLastSyncLock.Lock()
defer rc.timeOfLastSyncLock.Unlock()
return !rc.timeOfLastSync.IsZero() return !rc.timeOfLastSync.IsZero()
} }