From ecf2b402be8e1b862f8967c8a63894d6ce96e1bb Mon Sep 17 00:00:00 2001 From: HirazawaUi <695097494plus@gmail.com> Date: Wed, 24 Jul 2024 22:33:42 +0800 Subject: [PATCH] remove runonce mode --- cmd/kubelet/app/server.go | 21 +- pkg/kubelet/apis/config/types.go | 1 + .../apis/config/validation/validation.go | 3 + pkg/kubelet/kubelet.go | 1 - pkg/kubelet/runonce.go | 176 ----------------- pkg/kubelet/runonce_test.go | 184 ------------------ pkg/kubemark/hollow_kubelet.go | 2 +- 7 files changed, 10 insertions(+), 378 deletions(-) delete mode 100644 pkg/kubelet/runonce.go delete mode 100644 pkg/kubelet/runonce_test.go diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4eae74a54d4..cfb8d3f37bc 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -901,7 +901,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err) } - if err := RunKubelet(ctx, s, kubeDeps, s.RunOnce); err != nil { + if err := RunKubelet(ctx, s, kubeDeps); err != nil { return err } @@ -916,10 +916,6 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend }, 5*time.Second, wait.NeverStop) } - if s.RunOnce { - return nil - } - // If systemd is used, notify it that we have started go daemon.SdNotify(false, "READY=1") @@ -1232,7 +1228,7 @@ func setContentTypeForClient(cfg *restclient.Config, contentType string) { // 3 Standalone 'kubernetes' binary // // Eventually, #2 will be replaced with instances of #3 -func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { +func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies) error { hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride) if err != nil { return err @@ -1286,16 +1282,9 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps klog.ErrorS(err, "Failed to set rlimit on max file handles") } - // process pods and exit. - if runOnce { - if _, err := k.RunOnce(podCfg.Updates()); err != nil { - return fmt.Errorf("runonce failed: %w", err) - } - klog.InfoS("Started kubelet as runonce") - } else { - startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer) - klog.InfoS("Started kubelet") - } + startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer) + klog.InfoS("Started kubelet") + return nil } diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index 4d76687dd05..afc3606ef65 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -285,6 +285,7 @@ type KubeletConfiguration struct { ResolverConfig string // RunOnce causes the Kubelet to check the API server once for pods, // run those in addition to the pods specified by static pod files, and exit. + // Deprecated: no longer has any effect. RunOnce bool // cpuCFSQuota enables CPU CFS quota enforcement for containers that // specify CPU limits diff --git a/pkg/kubelet/apis/config/validation/validation.go b/pkg/kubelet/apis/config/validation/validation.go index afc4735daac..8c9fa35ff75 100644 --- a/pkg/kubelet/apis/config/validation/validation.go +++ b/pkg/kubelet/apis/config/validation/validation.go @@ -140,6 +140,9 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur if kc.ServerTLSBootstrap && !localFeatureGate.Enabled(features.RotateKubeletServerCertificate) { allErrors = append(allErrors, fmt.Errorf("invalid configuration: serverTLSBootstrap %v requires feature gate RotateKubeletServerCertificate", kc.ServerTLSBootstrap)) } + if kc.RunOnce { + allErrors = append(allErrors, fmt.Errorf("invalid configuration: runOnce (--runOnce) %v, Runonce mode has been deprecated and should not be set", kc.RunOnce)) + } for _, nodeTaint := range kc.RegisterWithTaints { if err := utiltaints.CheckTaintValidation(nodeTaint); err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 34c17ccab7d..df51a791612 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -278,7 +278,6 @@ type Bootstrap interface { ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider) ListenAndServePodResources() Run(<-chan kubetypes.PodUpdate) - RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error) } // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go deleted file mode 100644 index 448df444869..00000000000 --- a/pkg/kubelet/runonce.go +++ /dev/null @@ -1,176 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "context" - "fmt" - "os" - "time" - - v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/kubelet/util/format" -) - -const ( - runOnceManifestDelay = 1 * time.Second - runOnceMaxRetries = 10 - runOnceRetryDelay = 1 * time.Second - runOnceRetryDelayBackoff = 2 -) - -// RunPodResult defines the running results of a Pod. -type RunPodResult struct { - Pod *v1.Pod - Err error -} - -// RunOnce polls from one configuration update and run the associated pods. -func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) { - ctx := context.Background() - // Setup filesystem directories. - if err := kl.setupDataDirs(); err != nil { - return nil, err - } - - // If the container logs directory does not exist, create it. - if _, err := os.Stat(ContainerLogsDir); err != nil { - if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil { - klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir) - } - } - - select { - case u := <-updates: - klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods)) - result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay) - klog.InfoS("Finished processing pods", "numPods", len(u.Pods)) - return result, err - case <-time.After(runOnceManifestDelay): - return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay) - } -} - -// runOnce runs a given set of pods and returns their status. -func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) { - ch := make(chan RunPodResult) - admitted := []*v1.Pod{} - for _, pod := range pods { - // Check if we can admit the pod. - if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok { - kl.rejectPod(pod, reason, message) - results = append(results, RunPodResult{pod, nil}) - continue - } - - admitted = append(admitted, pod) - go func(pod *v1.Pod) { - err := kl.runPod(ctx, pod, retryDelay) - ch <- RunPodResult{pod, err} - }(pod) - } - - klog.InfoS("Waiting for pods", "numPods", len(admitted)) - failedPods := []string{} - for i := 0; i < len(admitted); i++ { - res := <-ch - results = append(results, res) - if res.Err != nil { - failedContainerName, err := kl.getFailedContainers(ctx, res.Pod) - if err != nil { - klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err) - } else { - klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName) - } - failedPods = append(failedPods, format.Pod(res.Pod)) - } else { - klog.InfoS("Started pod", "pod", klog.KObj(res.Pod)) - } - } - if len(failedPods) > 0 { - return results, fmt.Errorf("error running pods: %v", failedPods) - } - klog.InfoS("Pods started", "numPods", len(pods)) - return results, err -} - -// runPod runs a single pod and waits until all containers are running. -func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Duration) error { - var isTerminal bool - delay := retryDelay - retry := 0 - for !isTerminal { - status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace) - if err != nil { - return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err) - } - - if kl.isPodRunning(pod, status) { - klog.InfoS("Pod's containers running", "pod", klog.KObj(pod)) - return nil - } - klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod)) - - klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { - klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) - } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil { - return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err) - } - if retry >= runOnceMaxRetries { - return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries) - } - // TODO(proppy): health checking would be better than waiting + checking the state at the next iteration. - klog.InfoS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay) - time.Sleep(delay) - retry++ - delay *= runOnceRetryDelayBackoff - } - return nil -} - -// isPodRunning returns true if all containers of a manifest are running. -func (kl *Kubelet) isPodRunning(pod *v1.Pod, status *kubecontainer.PodStatus) bool { - for _, c := range pod.Spec.Containers { - cs := status.FindContainerStatusByName(c.Name) - if cs == nil || cs.State != kubecontainer.ContainerStateRunning { - klog.InfoS("Container not running", "pod", klog.KObj(pod), "containerName", c.Name) - return false - } - } - return true -} - -// getFailedContainer returns failed container name for pod. -func (kl *Kubelet) getFailedContainers(ctx context.Context, pod *v1.Pod) ([]string, error) { - status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace) - if err != nil { - return nil, fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err) - } - var containerNames []string - for _, cs := range status.ContainerStatuses { - if cs.State != kubecontainer.ContainerStateRunning && cs.ExitCode != 0 { - containerNames = append(containerNames, cs.Name) - } - } - return containerNames, nil -} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go deleted file mode 100644 index 55a407309c1..00000000000 --- a/pkg/kubelet/runonce_test.go +++ /dev/null @@ -1,184 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "context" - "os" - "path/filepath" - "testing" - "time" - - cadvisorapi "github.com/google/cadvisor/info/v1" - cadvisorapiv2 "github.com/google/cadvisor/info/v2" - "k8s.io/mount-utils" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/record" - utiltesting "k8s.io/client-go/util/testing" - cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" - "k8s.io/kubernetes/pkg/kubelet/clustertrustbundle" - "k8s.io/kubernetes/pkg/kubelet/cm" - "k8s.io/kubernetes/pkg/kubelet/configmap" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" - "k8s.io/kubernetes/pkg/kubelet/eviction" - kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" - "k8s.io/kubernetes/pkg/kubelet/secret" - "k8s.io/kubernetes/pkg/kubelet/server/stats" - "k8s.io/kubernetes/pkg/kubelet/status" - statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" - kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" - "k8s.io/kubernetes/pkg/kubelet/volumemanager" - "k8s.io/kubernetes/pkg/volume" - volumetest "k8s.io/kubernetes/pkg/volume/testing" - "k8s.io/kubernetes/pkg/volume/util/hostutil" - "k8s.io/utils/clock" -) - -func TestRunOnce(t *testing.T) { - ctx := context.Background() - - cadvisor := cadvisortest.NewMockInterface(t) - cadvisor.EXPECT().MachineInfo().Return(&cadvisorapi.MachineInfo{}, nil).Maybe() - cadvisor.EXPECT().ImagesFsInfo(ctx).Return(cadvisorapiv2.FsInfo{ - Usage: 400, - Capacity: 1000, - Available: 600, - }, nil).Maybe() - cadvisor.EXPECT().RootFsInfo().Return(cadvisorapiv2.FsInfo{ - Usage: 9, - Capacity: 10, - }, nil).Maybe() - fakeSecretManager := secret.NewFakeManager() - fakeConfigMapManager := configmap.NewFakeManager() - clusterTrustBundleManager := &clustertrustbundle.NoopManager{} - podManager := kubepod.NewBasicPodManager() - fakeRuntime := &containertest.FakeRuntime{} - podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() - basePath, err := utiltesting.MkTmpdir("kubelet") - if err != nil { - t.Fatalf("can't make a temp rootdir %v", err) - } - defer os.RemoveAll(basePath) - kb := &Kubelet{ - rootDirectory: filepath.Clean(basePath), - podLogsDirectory: filepath.Join(basePath, "pod-logs"), - recorder: &record.FakeRecorder{}, - cadvisor: cadvisor, - nodeLister: testNodeLister{}, - statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath), - mirrorPodClient: podtest.NewFakeMirrorClient(), - podManager: podManager, - podWorkers: &fakePodWorkers{}, - os: &containertest.FakeOS{}, - containerRuntime: fakeRuntime, - reasonCache: NewReasonCache(), - clock: clock.RealClock{}, - kubeClient: &fake.Clientset{}, - hostname: testKubeletHostname, - nodeName: testKubeletHostname, - runtimeState: newRuntimeState(time.Second), - hostutil: hostutil.NewFakeHostUtil(nil), - } - kb.containerManager = cm.NewStubContainerManager() - - plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} - kb.volumePluginMgr, err = - NewInitializedVolumePluginMgr(kb, fakeSecretManager, fakeConfigMapManager, nil, clusterTrustBundleManager, []volume.VolumePlugin{plug}, nil /* prober */) - if err != nil { - t.Fatalf("failed to initialize VolumePluginMgr: %v", err) - } - kb.volumeManager = volumemanager.NewVolumeManager( - true, - kb.nodeName, - kb.podManager, - kb.podWorkers, - kb.kubeClient, - kb.volumePluginMgr, - fakeRuntime, - kb.mounter, - kb.hostutil, - kb.getPodsDir(), - kb.recorder, - volumetest.NewBlockVolumePathHandler()) - - // TODO: Factor out "stats.Provider" from Kubelet so we don't have a cyclic dependency - volumeStatsAggPeriod := time.Second * 10 - kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod, kb.recorder) - nodeRef := &v1.ObjectReference{ - Kind: "Node", - Name: string(kb.nodeName), - UID: types.UID(kb.nodeName), - Namespace: "", - } - fakeKillPodFunc := func(pod *v1.Pod, evict bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { - return nil - } - evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation()) - - kb.evictionManager = evictionManager - kb.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) - kb.mounter = mount.NewFakeMounter(nil) - if err := kb.setupDataDirs(); err != nil { - t.Errorf("Failed to init data dirs: %v", err) - } - - pods := []*v1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - UID: "12345678", - Name: "foo", - Namespace: "new", - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - {Name: "bar"}, - }, - }, - }, - } - podManager.SetPods(pods) - // The original test here is totally meaningless, because fakeruntime will always return an empty podStatus. While - // the original logic of isPodRunning happens to return true when podstatus is empty, so the test can always pass. - // Now the logic in isPodRunning is changed, to let the test pass, we set the podstatus directly in fake runtime. - // This is also a meaningless test, because the isPodRunning will also always return true after setting this. However, - // because runonce is never used in kubernetes now, we should deprioritize the cleanup work. - // TODO(random-liu) Fix the test, make it meaningful. - fakeRuntime.PodStatus = kubecontainer.PodStatus{ - ContainerStatuses: []*kubecontainer.Status{ - { - Name: "bar", - State: kubecontainer.ContainerStateRunning, - }, - }, - } - results, err := kb.runOnce(ctx, pods, time.Millisecond) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if results[0].Err != nil { - t.Errorf("unexpected run pod error: %v", results[0].Err) - } - if results[0].Pod.Name != "foo" { - t.Errorf("unexpected pod: %q", results[0].Pod.Name) - } -} diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index 71808848a05..523b6fcd65f 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -127,7 +127,7 @@ func (hk *HollowKubelet) Run(ctx context.Context) { if err := kubeletapp.RunKubelet(ctx, &options.KubeletServer{ KubeletFlags: *hk.KubeletFlags, KubeletConfiguration: *hk.KubeletConfiguration, - }, hk.KubeletDeps, false); err != nil { + }, hk.KubeletDeps); err != nil { klog.Fatalf("Failed to run HollowKubelet: %v. Exiting.", err) } select {}