mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Merge pull request #126336 from HirazawaUi/remove-runonce-mode
Kubelet: Remove runonce mode
This commit is contained in:
commit
1ce20b2b6f
@ -901,7 +901,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
|
||||
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
|
||||
}
|
||||
|
||||
if err := RunKubelet(ctx, s, kubeDeps, s.RunOnce); err != nil {
|
||||
if err := RunKubelet(ctx, s, kubeDeps); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -916,10 +916,6 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
|
||||
}, 5*time.Second, wait.NeverStop)
|
||||
}
|
||||
|
||||
if s.RunOnce {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If systemd is used, notify it that we have started
|
||||
go daemon.SdNotify(false, "READY=1")
|
||||
|
||||
@ -1232,7 +1228,7 @@ func setContentTypeForClient(cfg *restclient.Config, contentType string) {
|
||||
// 3 Standalone 'kubernetes' binary
|
||||
//
|
||||
// Eventually, #2 will be replaced with instances of #3
|
||||
func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
|
||||
func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
|
||||
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1286,16 +1282,9 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps
|
||||
klog.ErrorS(err, "Failed to set rlimit on max file handles")
|
||||
}
|
||||
|
||||
// process pods and exit.
|
||||
if runOnce {
|
||||
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
|
||||
return fmt.Errorf("runonce failed: %w", err)
|
||||
}
|
||||
klog.InfoS("Started kubelet as runonce")
|
||||
} else {
|
||||
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
|
||||
klog.InfoS("Started kubelet")
|
||||
}
|
||||
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
|
||||
klog.InfoS("Started kubelet")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -285,6 +285,7 @@ type KubeletConfiguration struct {
|
||||
ResolverConfig string
|
||||
// RunOnce causes the Kubelet to check the API server once for pods,
|
||||
// run those in addition to the pods specified by static pod files, and exit.
|
||||
// Deprecated: no longer has any effect.
|
||||
RunOnce bool
|
||||
// cpuCFSQuota enables CPU CFS quota enforcement for containers that
|
||||
// specify CPU limits
|
||||
|
@ -140,6 +140,9 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
|
||||
if kc.ServerTLSBootstrap && !localFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid configuration: serverTLSBootstrap %v requires feature gate RotateKubeletServerCertificate", kc.ServerTLSBootstrap))
|
||||
}
|
||||
if kc.RunOnce {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid configuration: runOnce (--runOnce) %v, Runonce mode has been deprecated and should not be set", kc.RunOnce))
|
||||
}
|
||||
|
||||
for _, nodeTaint := range kc.RegisterWithTaints {
|
||||
if err := utiltaints.CheckTaintValidation(nodeTaint); err != nil {
|
||||
|
@ -278,7 +278,6 @@ type Bootstrap interface {
|
||||
ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider)
|
||||
ListenAndServePodResources()
|
||||
Run(<-chan kubetypes.PodUpdate)
|
||||
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
|
||||
}
|
||||
|
||||
// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
|
||||
|
@ -1,176 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
||||
const (
|
||||
runOnceManifestDelay = 1 * time.Second
|
||||
runOnceMaxRetries = 10
|
||||
runOnceRetryDelay = 1 * time.Second
|
||||
runOnceRetryDelayBackoff = 2
|
||||
)
|
||||
|
||||
// RunPodResult defines the running results of a Pod.
|
||||
type RunPodResult struct {
|
||||
Pod *v1.Pod
|
||||
Err error
|
||||
}
|
||||
|
||||
// RunOnce polls from one configuration update and run the associated pods.
|
||||
func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
|
||||
ctx := context.Background()
|
||||
// Setup filesystem directories.
|
||||
if err := kl.setupDataDirs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If the container logs directory does not exist, create it.
|
||||
if _, err := os.Stat(ContainerLogsDir); err != nil {
|
||||
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
|
||||
klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case u := <-updates:
|
||||
klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
|
||||
result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
|
||||
klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
|
||||
return result, err
|
||||
case <-time.After(runOnceManifestDelay):
|
||||
return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
|
||||
}
|
||||
}
|
||||
|
||||
// runOnce runs a given set of pods and returns their status.
|
||||
func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
|
||||
ch := make(chan RunPodResult)
|
||||
admitted := []*v1.Pod{}
|
||||
for _, pod := range pods {
|
||||
// Check if we can admit the pod.
|
||||
if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
|
||||
kl.rejectPod(pod, reason, message)
|
||||
results = append(results, RunPodResult{pod, nil})
|
||||
continue
|
||||
}
|
||||
|
||||
admitted = append(admitted, pod)
|
||||
go func(pod *v1.Pod) {
|
||||
err := kl.runPod(ctx, pod, retryDelay)
|
||||
ch <- RunPodResult{pod, err}
|
||||
}(pod)
|
||||
}
|
||||
|
||||
klog.InfoS("Waiting for pods", "numPods", len(admitted))
|
||||
failedPods := []string{}
|
||||
for i := 0; i < len(admitted); i++ {
|
||||
res := <-ch
|
||||
results = append(results, res)
|
||||
if res.Err != nil {
|
||||
failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
|
||||
if err != nil {
|
||||
klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
|
||||
} else {
|
||||
klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
|
||||
}
|
||||
failedPods = append(failedPods, format.Pod(res.Pod))
|
||||
} else {
|
||||
klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
|
||||
}
|
||||
}
|
||||
if len(failedPods) > 0 {
|
||||
return results, fmt.Errorf("error running pods: %v", failedPods)
|
||||
}
|
||||
klog.InfoS("Pods started", "numPods", len(pods))
|
||||
return results, err
|
||||
}
|
||||
|
||||
// runPod runs a single pod and waits until all containers are running.
|
||||
func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Duration) error {
|
||||
var isTerminal bool
|
||||
delay := retryDelay
|
||||
retry := 0
|
||||
for !isTerminal {
|
||||
status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
|
||||
if kl.isPodRunning(pod, status) {
|
||||
klog.InfoS("Pod's containers running", "pod", klog.KObj(pod))
|
||||
return nil
|
||||
}
|
||||
klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
|
||||
|
||||
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
|
||||
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
|
||||
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
|
||||
}
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
|
||||
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
if retry >= runOnceMaxRetries {
|
||||
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries)
|
||||
}
|
||||
// TODO(proppy): health checking would be better than waiting + checking the state at the next iteration.
|
||||
klog.InfoS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay)
|
||||
time.Sleep(delay)
|
||||
retry++
|
||||
delay *= runOnceRetryDelayBackoff
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isPodRunning returns true if all containers of a manifest are running.
|
||||
func (kl *Kubelet) isPodRunning(pod *v1.Pod, status *kubecontainer.PodStatus) bool {
|
||||
for _, c := range pod.Spec.Containers {
|
||||
cs := status.FindContainerStatusByName(c.Name)
|
||||
if cs == nil || cs.State != kubecontainer.ContainerStateRunning {
|
||||
klog.InfoS("Container not running", "pod", klog.KObj(pod), "containerName", c.Name)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// getFailedContainer returns failed container name for pod.
|
||||
func (kl *Kubelet) getFailedContainers(ctx context.Context, pod *v1.Pod) ([]string, error) {
|
||||
status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
var containerNames []string
|
||||
for _, cs := range status.ContainerStatuses {
|
||||
if cs.State != kubecontainer.ContainerStateRunning && cs.ExitCode != 0 {
|
||||
containerNames = append(containerNames, cs.Name)
|
||||
}
|
||||
}
|
||||
return containerNames, nil
|
||||
}
|
@ -1,184 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
|
||||
"k8s.io/mount-utils"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/eviction"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/util/hostutil"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
func TestRunOnce(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
cadvisor := cadvisortest.NewMockInterface(t)
|
||||
cadvisor.EXPECT().MachineInfo().Return(&cadvisorapi.MachineInfo{}, nil).Maybe()
|
||||
cadvisor.EXPECT().ImagesFsInfo(ctx).Return(cadvisorapiv2.FsInfo{
|
||||
Usage: 400,
|
||||
Capacity: 1000,
|
||||
Available: 600,
|
||||
}, nil).Maybe()
|
||||
cadvisor.EXPECT().RootFsInfo().Return(cadvisorapiv2.FsInfo{
|
||||
Usage: 9,
|
||||
Capacity: 10,
|
||||
}, nil).Maybe()
|
||||
fakeSecretManager := secret.NewFakeManager()
|
||||
fakeConfigMapManager := configmap.NewFakeManager()
|
||||
clusterTrustBundleManager := &clustertrustbundle.NoopManager{}
|
||||
podManager := kubepod.NewBasicPodManager()
|
||||
fakeRuntime := &containertest.FakeRuntime{}
|
||||
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
|
||||
basePath, err := utiltesting.MkTmpdir("kubelet")
|
||||
if err != nil {
|
||||
t.Fatalf("can't make a temp rootdir %v", err)
|
||||
}
|
||||
defer os.RemoveAll(basePath)
|
||||
kb := &Kubelet{
|
||||
rootDirectory: filepath.Clean(basePath),
|
||||
podLogsDirectory: filepath.Join(basePath, "pod-logs"),
|
||||
recorder: &record.FakeRecorder{},
|
||||
cadvisor: cadvisor,
|
||||
nodeLister: testNodeLister{},
|
||||
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath),
|
||||
mirrorPodClient: podtest.NewFakeMirrorClient(),
|
||||
podManager: podManager,
|
||||
podWorkers: &fakePodWorkers{},
|
||||
os: &containertest.FakeOS{},
|
||||
containerRuntime: fakeRuntime,
|
||||
reasonCache: NewReasonCache(),
|
||||
clock: clock.RealClock{},
|
||||
kubeClient: &fake.Clientset{},
|
||||
hostname: testKubeletHostname,
|
||||
nodeName: testKubeletHostname,
|
||||
runtimeState: newRuntimeState(time.Second),
|
||||
hostutil: hostutil.NewFakeHostUtil(nil),
|
||||
}
|
||||
kb.containerManager = cm.NewStubContainerManager()
|
||||
|
||||
plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
|
||||
kb.volumePluginMgr, err =
|
||||
NewInitializedVolumePluginMgr(kb, fakeSecretManager, fakeConfigMapManager, nil, clusterTrustBundleManager, []volume.VolumePlugin{plug}, nil /* prober */)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to initialize VolumePluginMgr: %v", err)
|
||||
}
|
||||
kb.volumeManager = volumemanager.NewVolumeManager(
|
||||
true,
|
||||
kb.nodeName,
|
||||
kb.podManager,
|
||||
kb.podWorkers,
|
||||
kb.kubeClient,
|
||||
kb.volumePluginMgr,
|
||||
fakeRuntime,
|
||||
kb.mounter,
|
||||
kb.hostutil,
|
||||
kb.getPodsDir(),
|
||||
kb.recorder,
|
||||
volumetest.NewBlockVolumePathHandler())
|
||||
|
||||
// TODO: Factor out "stats.Provider" from Kubelet so we don't have a cyclic dependency
|
||||
volumeStatsAggPeriod := time.Second * 10
|
||||
kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod, kb.recorder)
|
||||
nodeRef := &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: string(kb.nodeName),
|
||||
UID: types.UID(kb.nodeName),
|
||||
Namespace: "",
|
||||
}
|
||||
fakeKillPodFunc := func(pod *v1.Pod, evict bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
|
||||
return nil
|
||||
}
|
||||
evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation())
|
||||
|
||||
kb.evictionManager = evictionManager
|
||||
kb.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
|
||||
kb.mounter = mount.NewFakeMounter(nil)
|
||||
if err := kb.setupDataDirs(); err != nil {
|
||||
t.Errorf("Failed to init data dirs: %v", err)
|
||||
}
|
||||
|
||||
pods := []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "foo",
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{Name: "bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
podManager.SetPods(pods)
|
||||
// The original test here is totally meaningless, because fakeruntime will always return an empty podStatus. While
|
||||
// the original logic of isPodRunning happens to return true when podstatus is empty, so the test can always pass.
|
||||
// Now the logic in isPodRunning is changed, to let the test pass, we set the podstatus directly in fake runtime.
|
||||
// This is also a meaningless test, because the isPodRunning will also always return true after setting this. However,
|
||||
// because runonce is never used in kubernetes now, we should deprioritize the cleanup work.
|
||||
// TODO(random-liu) Fix the test, make it meaningful.
|
||||
fakeRuntime.PodStatus = kubecontainer.PodStatus{
|
||||
ContainerStatuses: []*kubecontainer.Status{
|
||||
{
|
||||
Name: "bar",
|
||||
State: kubecontainer.ContainerStateRunning,
|
||||
},
|
||||
},
|
||||
}
|
||||
results, err := kb.runOnce(ctx, pods, time.Millisecond)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if results[0].Err != nil {
|
||||
t.Errorf("unexpected run pod error: %v", results[0].Err)
|
||||
}
|
||||
if results[0].Pod.Name != "foo" {
|
||||
t.Errorf("unexpected pod: %q", results[0].Pod.Name)
|
||||
}
|
||||
}
|
@ -127,7 +127,7 @@ func (hk *HollowKubelet) Run(ctx context.Context) {
|
||||
if err := kubeletapp.RunKubelet(ctx, &options.KubeletServer{
|
||||
KubeletFlags: *hk.KubeletFlags,
|
||||
KubeletConfiguration: *hk.KubeletConfiguration,
|
||||
}, hk.KubeletDeps, false); err != nil {
|
||||
}, hk.KubeletDeps); err != nil {
|
||||
klog.Fatalf("Failed to run HollowKubelet: %v. Exiting.", err)
|
||||
}
|
||||
select {}
|
||||
|
Loading…
Reference in New Issue
Block a user