Merge pull request #91577 from knabben/kubelet-bootstrap

kubelet: remove the --bootstrap-checkpoint-path feature
This commit is contained in:
Kubernetes Prow Robot 2020-07-09 00:03:41 -07:00 committed by GitHub
commit 1e3eeba9fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 25 additions and 505 deletions

View File

@ -137,9 +137,6 @@ type KubeletFlags struct {
ExitOnLockContention bool
// seccompProfileRoot is the directory path for seccomp profiles.
SeccompProfileRoot string
// bootstrapCheckpointPath is the path to the directory containing pod checkpoints to
// run on restore
BootstrapCheckpointPath string
// DEPRECATED FLAGS
// minimumGCAge is the minimum age for a finished container before it is
@ -347,7 +344,6 @@ func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
fs.Var(&bindableNodeLabels, "node-labels", fmt.Sprintf("<Warning: Alpha feature> Labels to add when registering the node in the cluster. Labels must be key=value pairs separated by ','. Labels in the 'kubernetes.io' namespace must begin with an allowed prefix (%s) or be in the specifically allowed set (%s)", strings.Join(kubeletapis.KubeletLabelNamespaces(), ", "), strings.Join(kubeletapis.KubeletLabels(), ", ")))
fs.StringVar(&f.LockFilePath, "lock-file", f.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
fs.BoolVar(&f.ExitOnLockContention, "exit-on-lock-contention", f.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.")
fs.StringVar(&f.BootstrapCheckpointPath, "bootstrap-checkpoint-path", f.BootstrapCheckpointPath, "<Warning: Alpha feature> Path to the directory where the checkpoints are stored")
// DEPRECATED FLAGS
fs.StringVar(&f.BootstrapKubeconfig, "experimental-bootstrap-kubeconfig", f.BootstrapKubeconfig, "")

View File

@ -1119,7 +1119,6 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot,
kubeServer.BootstrapCheckpointPath,
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
@ -1194,7 +1193,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string,
nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
@ -1226,7 +1224,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes,
nodeLabels,
seccompProfileRoot,
bootstrapCheckpointPath,
nodeStatusMaxImages)
if err != nil {
return nil, err

View File

@ -64,10 +64,6 @@ const (
// This annotation can be attached to node.
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
// BootstrapCheckpointAnnotationKey represents a Resource (Pod) that should be checkpointed by
// the kubelet prior to running
BootstrapCheckpointAnnotationKey string = "node.kubernetes.io/bootstrap-checkpoint"
// NonConvertibleAnnotationPrefix annotation key prefix used to identify non-convertible json paths.
NonConvertibleAnnotationPrefix = "non-convertible.kubernetes.io"

View File

@ -47,7 +47,6 @@ go_library(
"//pkg/kubelet/apis/podresources:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cloudresource:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
@ -281,7 +280,6 @@ filegroup(
"//pkg/kubelet/apis:all-srcs",
"//pkg/kubelet/cadvisor:all-srcs",
"//pkg/kubelet/certificate:all-srcs",
"//pkg/kubelet/checkpoint:all-srcs",
"//pkg/kubelet/checkpointmanager:all-srcs",
"//pkg/kubelet/client:all-srcs",
"//pkg/kubelet/cloudresource:all-srcs",

View File

@ -1,41 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["checkpoint.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["checkpoint_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,128 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"encoding/json"
"fmt"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
const (
// Delimiter used on checkpoints written to disk
delimiter = "_"
podPrefix = "Pod"
)
// PodCheckpoint defines the operations to retrieve pod
type PodCheckpoint interface {
checkpointmanager.Checkpoint
GetPod() *v1.Pod
}
// Data to be stored as checkpoint
type Data struct {
Pod *v1.Pod
Checksum checksum.Checksum
}
// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
return &Data{Pod: pod}
}
// MarshalCheckpoint returns marshalled data
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Pod)
return json.Marshal(*cp)
}
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Pod)
}
// GetPod retrieves the pod from the checkpoint
func (cp *Data) GetPod() *v1.Pod {
return cp.Pod
}
// checkAnnotations will validate the checkpoint annotations exist on the Pod
func checkAnnotations(pod *v1.Pod) bool {
if podAnnotations := pod.GetAnnotations(); podAnnotations != nil {
if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" {
return true
}
}
return false
}
//getPodKey returns the full qualified path for the pod checkpoint
func getPodKey(pod *v1.Pod) string {
return fmt.Sprintf("%s%s%v.yaml", podPrefix, delimiter, pod.GetUID())
}
// LoadPods Loads All Checkpoints from disk
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
pods := make([]*v1.Pod, 0)
checkpointKeys, err := cpm.ListCheckpoints()
if err != nil {
klog.Errorf("Failed to list checkpoints: %v", err)
}
for _, key := range checkpointKeys {
checkpoint := NewPodCheckpoint(nil)
err := cpm.GetCheckpoint(key, checkpoint)
if err != nil {
klog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
continue
}
pods = append(pods, checkpoint.GetPod())
}
return pods, nil
}
// WritePod a checkpoint to a file on disk if annotation is present
func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
var err error
if checkAnnotations(pod) {
data := NewPodCheckpoint(pod)
err = cpm.CreateCheckpoint(getPodKey(pod), data)
} else {
// This is to handle an edge where a pod update could remove
// an annotation and the checkpoint should then be removed.
err = cpm.RemoveCheckpoint(getPodKey(pod))
}
return err
}
// DeletePod deletes a checkpoint from disk if present
func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
return cpm.RemoveCheckpoint(getPodKey(pod))
}

View File

@ -1,124 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"io/ioutil"
"os"
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
)
// TestWriteLoadDeletePods validates all combinations of write, load, and delete
func TestWriteLoadDeletePods(t *testing.T) {
testPods := []struct {
pod *v1.Pod
written bool
}{
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Foo",
Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"},
UID: "1",
},
},
written: true,
},
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Foo2",
Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"},
UID: "2",
},
},
written: true,
},
{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "Bar",
UID: "3",
},
},
written: false,
},
}
dir, err := ioutil.TempDir("", "checkpoint")
if err != nil {
t.Errorf("Failed to allocate temp directory for TestWriteLoadDeletePods error=%v", err)
}
defer os.RemoveAll(dir)
cpm, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
t.Errorf("Failed to initialize checkpoint manager error=%v", err)
}
for _, p := range testPods {
// Write pods should always pass unless there is an fs error
if err := WritePod(cpm, p.pod); err != nil {
t.Errorf("Failed to Write Pod: %v", err)
}
}
// verify the correct written files are loaded from disk
pods, err := LoadPods(cpm)
if err != nil {
t.Errorf("Failed to Load Pods: %v", err)
}
// loop through contents and check make sure
// what was loaded matched the expected results.
for _, p := range testPods {
pname := p.pod.GetName()
var lpod *v1.Pod
for _, check := range pods {
if check.GetName() == pname {
lpod = check
break
}
}
if p.written {
if lpod != nil {
if !reflect.DeepEqual(p.pod, lpod) {
t.Errorf("expected %#v, \ngot %#v", p.pod, lpod)
}
} else {
t.Errorf("Got unexpected result for %v, should have been loaded", pname)
}
} else if lpod != nil {
t.Errorf("Got unexpected result for %v, should not have been loaded", pname)
}
err = DeletePod(cpm, p.pod)
if err != nil {
t.Errorf("Failed to delete pod %v", pname)
}
}
// finally validate the contents of the directory is empty.
files, err := ioutil.ReadDir(dir)
if err != nil {
t.Errorf("Failed to read directory %v", dir)
}
if len(files) > 0 {
t.Errorf("Directory %v should be empty but found %#v", dir, files)
}
}

View File

@ -25,8 +25,6 @@ go_library(
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/types:go_default_library",
@ -77,8 +75,6 @@ go_test(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/securitycontext:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -26,8 +26,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -63,9 +61,8 @@ type PodConfig struct {
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpointmanager.CheckpointManager
sourcesLock sync.Mutex
sources sets.String
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
@ -111,24 +108,6 @@ func (c *PodConfig) Sync() {
c.pods.Sync()
}
// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
if c.checkpointManager != nil {
return nil
}
var err error
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
if err != nil {
return err
}
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err != nil {
return err
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
return nil
}
// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
@ -173,7 +152,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
@ -191,9 +170,6 @@ func (s *podStorage) Merge(source string, change interface{}) error {
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if len(restores.Pods) > 0 {
s.updates <- *restores
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
@ -230,7 +206,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil
}
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
@ -239,7 +215,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
restorePods := []*v1.Pod{}
pods := s.pods[source]
if pods == nil {
@ -312,9 +287,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
removePods = append(removePods, existing)
}
}
case kubetypes.RESTORE:
klog.V(4).Infof("Restoring pods for source %s", source)
restorePods = append(restorePods, update.Pods...)
default:
klog.Warningf("Received invalid update type: %v", update)
@ -328,9 +300,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}
return adds, updates, deletes, removes, reconciles, restores
return adds, updates, deletes, removes, reconciles
}
func (s *podStorage) markSourceSet(source string) {

View File

@ -17,9 +17,7 @@ limitations under the License.
package config
import (
"io/ioutil"
"math/rand"
"os"
"reflect"
"sort"
"strconv"
@ -32,9 +30,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext"
)
@ -90,14 +85,6 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
}
func createPodConfigTesterByChannel(mode PodConfigNotificationMode, channelName string) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
channel := config.Channel(channelName)
ch := config.Updates()
return channel, ch, config
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
@ -426,35 +413,3 @@ func TestPodUpdateLabels(t *testing.T) {
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
}
func TestPodRestore(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(tmpDir)
pod := CreateValidPod("api-server", "kube-default")
pod.Annotations = make(map[string]string)
pod.Annotations["kubernetes.io/config.source"] = kubetypes.ApiserverSource
pod.Annotations[core.BootstrapCheckpointAnnotationKey] = "true"
// Create Checkpointer
checkpointManager, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
t.Fatalf("failed to initialize checkpoint manager: %v", err)
}
if err := checkpoint.WritePod(checkpointManager, pod); err != nil {
t.Fatalf("Error writing checkpoint for pod: %v", pod.GetName())
}
// Restore checkpoint
channel, ch, config := createPodConfigTesterByChannel(PodConfigNotificationIncremental, kubetypes.ApiserverSource)
if err := config.Restore(tmpDir, channel); err != nil {
t.Fatalf("Restore returned error: %v", err)
}
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RESTORE, kubetypes.ApiserverSource, pod))
// Verify Restore only happen once
if err := config.Restore(tmpDir, channel); err != nil {
t.Fatalf("The second restore returned error: %v", err)
}
expectNoPodUpdate(t, ch)
}

View File

@ -66,7 +66,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -249,7 +248,7 @@ type DockerOptions struct {
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
@ -274,20 +273,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
// Restore from the checkpoint path
// NOTE: This MUST happen before creating the apiserver source
// below, or the checkpoint would override the source of truth.
var updatechannel chan<- interface{}
if bootstrapCheckpointPath != "" {
klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
@ -376,7 +362,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string,
nodeStatusMaxImages int32) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
@ -399,7 +384,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
if err != nil {
return nil, err
}
@ -586,18 +571,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.livenessManager = proberesults.NewManager()
klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager, checkpointManager)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
@ -1855,28 +1833,15 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
default:
klog.Errorf("Invalid event type received: %d.", u.Op)
}
if u.Op != kubetypes.RESTORE {
// If the update type is RESTORE, it means that the update is from
// the pod checkpoints and may be incomplete. Do not mark the
// source as ready.
kl.sourcesReady.AddSource(u.Source)
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.

View File

@ -221,7 +221,7 @@ func newTestKubeletWithImageList(
kubelet.secretManager = secretManager
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager, podtest.NewMockCheckpointManager())
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager)
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
kubelet.containerRuntime = fakeRuntime

View File

@ -14,8 +14,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/pod",
deps = [
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/configmap:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/secret:go_default_library",

View File

@ -19,12 +19,8 @@ package pod
import (
"sync"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/secret"
@ -120,20 +116,18 @@ type basicManager struct {
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
// basicManager is keeping secretManager and configMapManager up-to-date.
secretManager secret.Manager
configMapManager configmap.Manager
checkpointManager checkpointmanager.CheckpointManager
secretManager secret.Manager
configMapManager configmap.Manager
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
// NewBasicPodManager returns a functional Manager.
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager {
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager {
pm := &basicManager{}
pm.secretManager = secretManager
pm.configMapManager = configMapManager
pm.checkpointManager = cpm
pm.MirrorClient = client
pm.SetPods(nil)
return pm
@ -161,11 +155,6 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
pm.updatePodsInternal(pod)
if pm.checkpointManager != nil {
if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil {
klog.Errorf("Error writing checkpoint for pod: %v", pod.GetName())
}
}
}
func isPodInTerminatedState(pod *v1.Pod) bool {
@ -244,11 +233,6 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
delete(pm.podByFullName, podFullName)
}
if pm.checkpointManager != nil {
if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil {
klog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName())
}
}
}
func (pm *basicManager) GetPods() []*v1.Pod {

View File

@ -34,7 +34,7 @@ func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
fakeMirrorClient := podtest.NewFakeMirrorClient()
secretManager := secret.NewFakeManager()
configMapManager := configmap.NewFakeManager()
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager, podtest.NewMockCheckpointManager()).(*basicManager)
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager)
return manager, fakeMirrorClient
}

View File

@ -13,8 +13,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing",
deps = [
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -22,8 +22,6 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
cp "k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -85,37 +83,3 @@ func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) {
defer fmc.mirrorPodLock.RUnlock()
return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
}
type MockCheckpointManager struct {
checkpoint map[string]*cp.Data
}
func (ckm *MockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
ckm.checkpoint[checkpointKey] = (checkpoint.(*cp.Data))
return nil
}
func (ckm *MockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
*(checkpoint.(*cp.Data)) = *(ckm.checkpoint[checkpointKey])
return nil
}
func (ckm *MockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
_, ok := ckm.checkpoint[checkpointKey]
if ok {
delete(ckm.checkpoint, "moo")
}
return nil
}
func (ckm *MockCheckpointManager) ListCheckpoints() ([]string, error) {
var keys []string
for key := range ckm.checkpoint {
keys = append(keys, key)
}
return keys, nil
}
func NewMockCheckpointManager() checkpointmanager.CheckpointManager {
return &MockCheckpointManager{checkpoint: make(map[string]*cp.Data)}
}

View File

@ -104,7 +104,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
}
func newTestManager() *manager {
podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil)
podManager := kubepod.NewBasicPodManager(nil, nil, nil)
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
m := NewManager(

View File

@ -119,7 +119,7 @@ func TestDoProbe(t *testing.T) {
}
// Clean up.
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -65,7 +65,7 @@ func TestRunOnce(t *testing.T) {
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
fakeRuntime := &containertest.FakeRuntime{}
basePath, err := utiltesting.MkTmpdir("kubelet")
if err != nil {

View File

@ -81,7 +81,7 @@ func (m *manager) testSyncBatch() {
}
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager(), podtest.NewMockCheckpointManager())
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
podManager.AddPod(getTestPod())
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
}

View File

@ -48,8 +48,6 @@ const (
// Pods with the given ids have unexpected status in this source,
// kubelet should reconcile status with this source
RECONCILE
// Pods with the given ids have been restored from a checkpoint.
RESTORE
// These constants identify the sources of pods
// Updates from a file

View File

@ -1008,7 +1008,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
fakePodManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr)

View File

@ -90,8 +90,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
@ -147,8 +146,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
claim.Status = v1.PersistentVolumeClaimStatus{
@ -193,8 +191,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)