mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Deprecating --bootstrap-checkpoint-path flag
This commit is contained in:
parent
eda662b2fd
commit
0ed41c3f10
@ -140,9 +140,6 @@ type KubeletFlags struct {
|
|||||||
ExitOnLockContention bool
|
ExitOnLockContention bool
|
||||||
// seccompProfileRoot is the directory path for seccomp profiles.
|
// seccompProfileRoot is the directory path for seccomp profiles.
|
||||||
SeccompProfileRoot string
|
SeccompProfileRoot string
|
||||||
// bootstrapCheckpointPath is the path to the directory containing pod checkpoints to
|
|
||||||
// run on restore
|
|
||||||
BootstrapCheckpointPath string
|
|
||||||
|
|
||||||
// DEPRECATED FLAGS
|
// DEPRECATED FLAGS
|
||||||
// minimumGCAge is the minimum age for a finished container before it is
|
// minimumGCAge is the minimum age for a finished container before it is
|
||||||
@ -356,7 +353,6 @@ func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
|
|||||||
fs.Var(&bindableNodeLabels, "node-labels", fmt.Sprintf("<Warning: Alpha feature> Labels to add when registering the node in the cluster. Labels must be key=value pairs separated by ','. Labels in the 'kubernetes.io' namespace must begin with an allowed prefix (%s) or be in the specifically allowed set (%s)", strings.Join(kubeletapis.KubeletLabelNamespaces(), ", "), strings.Join(kubeletapis.KubeletLabels(), ", ")))
|
fs.Var(&bindableNodeLabels, "node-labels", fmt.Sprintf("<Warning: Alpha feature> Labels to add when registering the node in the cluster. Labels must be key=value pairs separated by ','. Labels in the 'kubernetes.io' namespace must begin with an allowed prefix (%s) or be in the specifically allowed set (%s)", strings.Join(kubeletapis.KubeletLabelNamespaces(), ", "), strings.Join(kubeletapis.KubeletLabels(), ", ")))
|
||||||
fs.StringVar(&f.LockFilePath, "lock-file", f.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
|
fs.StringVar(&f.LockFilePath, "lock-file", f.LockFilePath, "<Warning: Alpha feature> The path to file for kubelet to use as a lock file.")
|
||||||
fs.BoolVar(&f.ExitOnLockContention, "exit-on-lock-contention", f.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.")
|
fs.BoolVar(&f.ExitOnLockContention, "exit-on-lock-contention", f.ExitOnLockContention, "Whether kubelet should exit upon lock-file contention.")
|
||||||
fs.StringVar(&f.BootstrapCheckpointPath, "bootstrap-checkpoint-path", f.BootstrapCheckpointPath, "<Warning: Alpha feature> Path to the directory where the checkpoints are stored")
|
|
||||||
|
|
||||||
// DEPRECATED FLAGS
|
// DEPRECATED FLAGS
|
||||||
fs.StringVar(&f.BootstrapKubeconfig, "experimental-bootstrap-kubeconfig", f.BootstrapKubeconfig, "")
|
fs.StringVar(&f.BootstrapKubeconfig, "experimental-bootstrap-kubeconfig", f.BootstrapKubeconfig, "")
|
||||||
|
@ -1122,7 +1122,6 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
|
|||||||
kubeServer.KeepTerminatedPodVolumes,
|
kubeServer.KeepTerminatedPodVolumes,
|
||||||
kubeServer.NodeLabels,
|
kubeServer.NodeLabels,
|
||||||
kubeServer.SeccompProfileRoot,
|
kubeServer.SeccompProfileRoot,
|
||||||
kubeServer.BootstrapCheckpointPath,
|
|
||||||
kubeServer.NodeStatusMaxImages)
|
kubeServer.NodeStatusMaxImages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create kubelet: %v", err)
|
return fmt.Errorf("failed to create kubelet: %v", err)
|
||||||
@ -1196,7 +1195,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
keepTerminatedPodVolumes bool,
|
keepTerminatedPodVolumes bool,
|
||||||
nodeLabels map[string]string,
|
nodeLabels map[string]string,
|
||||||
seccompProfileRoot string,
|
seccompProfileRoot string,
|
||||||
bootstrapCheckpointPath string,
|
|
||||||
nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
|
nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
|
||||||
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
|
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
|
||||||
// up into "per source" synchronizations
|
// up into "per source" synchronizations
|
||||||
@ -1228,7 +1226,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
keepTerminatedPodVolumes,
|
keepTerminatedPodVolumes,
|
||||||
nodeLabels,
|
nodeLabels,
|
||||||
seccompProfileRoot,
|
seccompProfileRoot,
|
||||||
bootstrapCheckpointPath,
|
|
||||||
nodeStatusMaxImages)
|
nodeStatusMaxImages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -61,10 +61,6 @@ const (
|
|||||||
// This annotation can be attached to node.
|
// This annotation can be attached to node.
|
||||||
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
|
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
|
||||||
|
|
||||||
// BootstrapCheckpointAnnotationKey represents a Resource (Pod) that should be checkpointed by
|
|
||||||
// the kubelet prior to running
|
|
||||||
BootstrapCheckpointAnnotationKey string = "node.kubernetes.io/bootstrap-checkpoint"
|
|
||||||
|
|
||||||
// NonConvertibleAnnotationPrefix annotation key prefix used to identify non-convertible json paths.
|
// NonConvertibleAnnotationPrefix annotation key prefix used to identify non-convertible json paths.
|
||||||
NonConvertibleAnnotationPrefix = "non-convertible.kubernetes.io"
|
NonConvertibleAnnotationPrefix = "non-convertible.kubernetes.io"
|
||||||
|
|
||||||
|
@ -47,7 +47,6 @@ go_library(
|
|||||||
"//pkg/kubelet/apis/podresources:go_default_library",
|
"//pkg/kubelet/apis/podresources:go_default_library",
|
||||||
"//pkg/kubelet/cadvisor:go_default_library",
|
"//pkg/kubelet/cadvisor:go_default_library",
|
||||||
"//pkg/kubelet/certificate:go_default_library",
|
"//pkg/kubelet/certificate:go_default_library",
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//pkg/kubelet/cloudresource:go_default_library",
|
"//pkg/kubelet/cloudresource:go_default_library",
|
||||||
"//pkg/kubelet/cm:go_default_library",
|
"//pkg/kubelet/cm:go_default_library",
|
||||||
"//pkg/kubelet/config:go_default_library",
|
"//pkg/kubelet/config:go_default_library",
|
||||||
@ -280,7 +279,6 @@ filegroup(
|
|||||||
"//pkg/kubelet/apis:all-srcs",
|
"//pkg/kubelet/apis:all-srcs",
|
||||||
"//pkg/kubelet/cadvisor:all-srcs",
|
"//pkg/kubelet/cadvisor:all-srcs",
|
||||||
"//pkg/kubelet/certificate:all-srcs",
|
"//pkg/kubelet/certificate:all-srcs",
|
||||||
"//pkg/kubelet/checkpoint:all-srcs",
|
|
||||||
"//pkg/kubelet/checkpointmanager:all-srcs",
|
"//pkg/kubelet/checkpointmanager:all-srcs",
|
||||||
"//pkg/kubelet/client:all-srcs",
|
"//pkg/kubelet/client:all-srcs",
|
||||||
"//pkg/kubelet/cloudresource:all-srcs",
|
"//pkg/kubelet/cloudresource:all-srcs",
|
||||||
|
@ -1,41 +0,0 @@
|
|||||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
|
||||||
|
|
||||||
go_library(
|
|
||||||
name = "go_default_library",
|
|
||||||
srcs = ["checkpoint.go"],
|
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpoint",
|
|
||||||
visibility = ["//visibility:public"],
|
|
||||||
deps = [
|
|
||||||
"//pkg/apis/core:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
|
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
|
||||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
go_test(
|
|
||||||
name = "go_default_test",
|
|
||||||
srcs = ["checkpoint_test.go"],
|
|
||||||
embed = [":go_default_library"],
|
|
||||||
deps = [
|
|
||||||
"//pkg/apis/core:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
|
||||||
name = "package-srcs",
|
|
||||||
srcs = glob(["**"]),
|
|
||||||
tags = ["automanaged"],
|
|
||||||
visibility = ["//visibility:private"],
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
|
||||||
name = "all-srcs",
|
|
||||||
srcs = [":package-srcs"],
|
|
||||||
tags = ["automanaged"],
|
|
||||||
visibility = ["//visibility:public"],
|
|
||||||
)
|
|
@ -1,128 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package checkpoint
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/apis/core"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Delimiter used on checkpoints written to disk
|
|
||||||
delimiter = "_"
|
|
||||||
podPrefix = "Pod"
|
|
||||||
)
|
|
||||||
|
|
||||||
// PodCheckpoint defines the operations to retrieve pod
|
|
||||||
type PodCheckpoint interface {
|
|
||||||
checkpointmanager.Checkpoint
|
|
||||||
GetPod() *v1.Pod
|
|
||||||
}
|
|
||||||
|
|
||||||
// Data to be stored as checkpoint
|
|
||||||
type Data struct {
|
|
||||||
Pod *v1.Pod
|
|
||||||
Checksum checksum.Checksum
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPodCheckpoint returns new pod checkpoint
|
|
||||||
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
|
|
||||||
return &Data{Pod: pod}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarshalCheckpoint returns marshalled data
|
|
||||||
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
|
|
||||||
cp.Checksum = checksum.New(*cp.Pod)
|
|
||||||
return json.Marshal(*cp)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalCheckpoint returns unmarshalled data
|
|
||||||
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
|
|
||||||
return json.Unmarshal(blob, cp)
|
|
||||||
}
|
|
||||||
|
|
||||||
// VerifyChecksum verifies that passed checksum is same as calculated checksum
|
|
||||||
func (cp *Data) VerifyChecksum() error {
|
|
||||||
return cp.Checksum.Verify(*cp.Pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPod retrieves the pod from the checkpoint
|
|
||||||
func (cp *Data) GetPod() *v1.Pod {
|
|
||||||
return cp.Pod
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkAnnotations will validate the checkpoint annotations exist on the Pod
|
|
||||||
func checkAnnotations(pod *v1.Pod) bool {
|
|
||||||
if podAnnotations := pod.GetAnnotations(); podAnnotations != nil {
|
|
||||||
if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
//getPodKey returns the full qualified path for the pod checkpoint
|
|
||||||
func getPodKey(pod *v1.Pod) string {
|
|
||||||
return fmt.Sprintf("%s%s%v.yaml", podPrefix, delimiter, pod.GetUID())
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadPods Loads All Checkpoints from disk
|
|
||||||
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
|
|
||||||
pods := make([]*v1.Pod, 0)
|
|
||||||
|
|
||||||
checkpointKeys, err := cpm.ListCheckpoints()
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to list checkpoints: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, key := range checkpointKeys {
|
|
||||||
checkpoint := NewPodCheckpoint(nil)
|
|
||||||
err := cpm.GetCheckpoint(key, checkpoint)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pods = append(pods, checkpoint.GetPod())
|
|
||||||
}
|
|
||||||
return pods, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WritePod a checkpoint to a file on disk if annotation is present
|
|
||||||
func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
|
|
||||||
var err error
|
|
||||||
if checkAnnotations(pod) {
|
|
||||||
data := NewPodCheckpoint(pod)
|
|
||||||
err = cpm.CreateCheckpoint(getPodKey(pod), data)
|
|
||||||
} else {
|
|
||||||
// This is to handle an edge where a pod update could remove
|
|
||||||
// an annotation and the checkpoint should then be removed.
|
|
||||||
err = cpm.RemoveCheckpoint(getPodKey(pod))
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeletePod deletes a checkpoint from disk if present
|
|
||||||
func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
|
|
||||||
return cpm.RemoveCheckpoint(getPodKey(pod))
|
|
||||||
}
|
|
@ -1,124 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package checkpoint
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/apis/core"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestWriteLoadDeletePods validates all combinations of write, load, and delete
|
|
||||||
func TestWriteLoadDeletePods(t *testing.T) {
|
|
||||||
testPods := []struct {
|
|
||||||
pod *v1.Pod
|
|
||||||
written bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
pod: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "Foo",
|
|
||||||
Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"},
|
|
||||||
UID: "1",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
written: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pod: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "Foo2",
|
|
||||||
Annotations: map[string]string{core.BootstrapCheckpointAnnotationKey: "true"},
|
|
||||||
UID: "2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
written: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pod: &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "Bar",
|
|
||||||
UID: "3",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
written: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "checkpoint")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to allocate temp directory for TestWriteLoadDeletePods error=%v", err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
|
|
||||||
cpm, err := checkpointmanager.NewCheckpointManager(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to initialize checkpoint manager error=%v", err)
|
|
||||||
}
|
|
||||||
for _, p := range testPods {
|
|
||||||
// Write pods should always pass unless there is an fs error
|
|
||||||
if err := WritePod(cpm, p.pod); err != nil {
|
|
||||||
t.Errorf("Failed to Write Pod: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// verify the correct written files are loaded from disk
|
|
||||||
pods, err := LoadPods(cpm)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to Load Pods: %v", err)
|
|
||||||
}
|
|
||||||
// loop through contents and check make sure
|
|
||||||
// what was loaded matched the expected results.
|
|
||||||
for _, p := range testPods {
|
|
||||||
pname := p.pod.GetName()
|
|
||||||
var lpod *v1.Pod
|
|
||||||
for _, check := range pods {
|
|
||||||
if check.GetName() == pname {
|
|
||||||
lpod = check
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if p.written {
|
|
||||||
if lpod != nil {
|
|
||||||
if !reflect.DeepEqual(p.pod, lpod) {
|
|
||||||
t.Errorf("expected %#v, \ngot %#v", p.pod, lpod)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
t.Errorf("Got unexpected result for %v, should have been loaded", pname)
|
|
||||||
}
|
|
||||||
} else if lpod != nil {
|
|
||||||
t.Errorf("Got unexpected result for %v, should not have been loaded", pname)
|
|
||||||
}
|
|
||||||
err = DeletePod(cpm, p.pod)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to delete pod %v", pname)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// finally validate the contents of the directory is empty.
|
|
||||||
files, err := ioutil.ReadDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to read directory %v", dir)
|
|
||||||
}
|
|
||||||
if len(files) > 0 {
|
|
||||||
t.Errorf("Directory %v should be empty but found %#v", dir, files)
|
|
||||||
}
|
|
||||||
}
|
|
@ -25,8 +25,6 @@ go_library(
|
|||||||
"//pkg/apis/core/v1:go_default_library",
|
"//pkg/apis/core/v1:go_default_library",
|
||||||
"//pkg/apis/core/validation:go_default_library",
|
"//pkg/apis/core/validation:go_default_library",
|
||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/kubelet/checkpoint:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/events:go_default_library",
|
"//pkg/kubelet/events:go_default_library",
|
||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
@ -77,8 +75,6 @@ go_test(
|
|||||||
"//pkg/apis/core:go_default_library",
|
"//pkg/apis/core:go_default_library",
|
||||||
"//pkg/apis/core/v1:go_default_library",
|
"//pkg/apis/core/v1:go_default_library",
|
||||||
"//pkg/apis/core/validation:go_default_library",
|
"//pkg/apis/core/validation:go_default_library",
|
||||||
"//pkg/kubelet/checkpoint:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
"//pkg/securitycontext:go_default_library",
|
"//pkg/securitycontext:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@ -26,8 +26,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
@ -63,9 +61,8 @@ type PodConfig struct {
|
|||||||
updates chan kubetypes.PodUpdate
|
updates chan kubetypes.PodUpdate
|
||||||
|
|
||||||
// contains the list of all configured sources
|
// contains the list of all configured sources
|
||||||
sourcesLock sync.Mutex
|
sourcesLock sync.Mutex
|
||||||
sources sets.String
|
sources sets.String
|
||||||
checkpointManager checkpointmanager.CheckpointManager
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPodConfig creates an object that can merge many configuration sources into a stream
|
// NewPodConfig creates an object that can merge many configuration sources into a stream
|
||||||
@ -111,24 +108,6 @@ func (c *PodConfig) Sync() {
|
|||||||
c.pods.Sync()
|
c.pods.Sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore restores pods from the checkpoint path, *once*
|
|
||||||
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
|
|
||||||
if c.checkpointManager != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
var err error
|
|
||||||
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
pods, err := checkpoint.LoadPods(c.checkpointManager)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// podStorage manages the current pod state at any point in time and ensures updates
|
// podStorage manages the current pod state at any point in time and ensures updates
|
||||||
// to the channel are delivered in order. Note that this object is an in-memory source of
|
// to the channel are delivered in order. Note that this object is an in-memory source of
|
||||||
// "truth" and on creation contains zero entries. Once all previously read sources are
|
// "truth" and on creation contains zero entries. Once all previously read sources are
|
||||||
@ -173,7 +152,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|||||||
defer s.updateLock.Unlock()
|
defer s.updateLock.Unlock()
|
||||||
|
|
||||||
seenBefore := s.sourcesSeen.Has(source)
|
seenBefore := s.sourcesSeen.Has(source)
|
||||||
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
|
adds, updates, deletes, removes, reconciles := s.merge(source, change)
|
||||||
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
||||||
|
|
||||||
// deliver update notifications
|
// deliver update notifications
|
||||||
@ -191,9 +170,6 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|||||||
if len(deletes.Pods) > 0 {
|
if len(deletes.Pods) > 0 {
|
||||||
s.updates <- *deletes
|
s.updates <- *deletes
|
||||||
}
|
}
|
||||||
if len(restores.Pods) > 0 {
|
|
||||||
s.updates <- *restores
|
|
||||||
}
|
|
||||||
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
|
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
|
||||||
// Send an empty update when first seeing the source and there are
|
// Send an empty update when first seeing the source and there are
|
||||||
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
|
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
|
||||||
@ -230,7 +206,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
|
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
|
||||||
s.podLock.Lock()
|
s.podLock.Lock()
|
||||||
defer s.podLock.Unlock()
|
defer s.podLock.Unlock()
|
||||||
|
|
||||||
@ -239,7 +215,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||||||
deletePods := []*v1.Pod{}
|
deletePods := []*v1.Pod{}
|
||||||
removePods := []*v1.Pod{}
|
removePods := []*v1.Pod{}
|
||||||
reconcilePods := []*v1.Pod{}
|
reconcilePods := []*v1.Pod{}
|
||||||
restorePods := []*v1.Pod{}
|
|
||||||
|
|
||||||
pods := s.pods[source]
|
pods := s.pods[source]
|
||||||
if pods == nil {
|
if pods == nil {
|
||||||
@ -312,9 +287,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||||||
removePods = append(removePods, existing)
|
removePods = append(removePods, existing)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case kubetypes.RESTORE:
|
|
||||||
klog.V(4).Infof("Restoring pods for source %s", source)
|
|
||||||
restorePods = append(restorePods, update.Pods...)
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
klog.Warningf("Received invalid update type: %v", update)
|
klog.Warningf("Received invalid update type: %v", update)
|
||||||
@ -328,9 +300,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|||||||
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
|
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
|
||||||
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
|
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
|
||||||
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
|
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
|
||||||
restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}
|
|
||||||
|
|
||||||
return adds, updates, deletes, removes, reconciles, restores
|
return adds, updates, deletes, removes, reconciles
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *podStorage) markSourceSet(source string) {
|
func (s *podStorage) markSourceSet(source string) {
|
||||||
|
@ -17,9 +17,7 @@ limitations under the License.
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -32,9 +30,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/kubernetes/pkg/apis/core"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/securitycontext"
|
"k8s.io/kubernetes/pkg/securitycontext"
|
||||||
)
|
)
|
||||||
@ -90,14 +85,6 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
|
|||||||
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
|
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createPodConfigTesterByChannel(mode PodConfigNotificationMode, channelName string) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
|
|
||||||
channel := config.Channel(channelName)
|
|
||||||
ch := config.Updates()
|
|
||||||
return channel, ch, config
|
|
||||||
}
|
|
||||||
|
|
||||||
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
|
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
|
||||||
@ -426,35 +413,3 @@ func TestPodUpdateLabels(t *testing.T) {
|
|||||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodRestore(t *testing.T) {
|
|
||||||
tmpDir, _ := ioutil.TempDir("", "")
|
|
||||||
defer os.RemoveAll(tmpDir)
|
|
||||||
|
|
||||||
pod := CreateValidPod("api-server", "kube-default")
|
|
||||||
pod.Annotations = make(map[string]string)
|
|
||||||
pod.Annotations["kubernetes.io/config.source"] = kubetypes.ApiserverSource
|
|
||||||
pod.Annotations[core.BootstrapCheckpointAnnotationKey] = "true"
|
|
||||||
|
|
||||||
// Create Checkpointer
|
|
||||||
checkpointManager, err := checkpointmanager.NewCheckpointManager(tmpDir)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to initialize checkpoint manager: %v", err)
|
|
||||||
}
|
|
||||||
if err := checkpoint.WritePod(checkpointManager, pod); err != nil {
|
|
||||||
t.Fatalf("Error writing checkpoint for pod: %v", pod.GetName())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restore checkpoint
|
|
||||||
channel, ch, config := createPodConfigTesterByChannel(PodConfigNotificationIncremental, kubetypes.ApiserverSource)
|
|
||||||
if err := config.Restore(tmpDir, channel); err != nil {
|
|
||||||
t.Fatalf("Restore returned error: %v", err)
|
|
||||||
}
|
|
||||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RESTORE, kubetypes.ApiserverSource, pod))
|
|
||||||
|
|
||||||
// Verify Restore only happen once
|
|
||||||
if err := config.Restore(tmpDir, channel); err != nil {
|
|
||||||
t.Fatalf("The second restore returned error: %v", err)
|
|
||||||
}
|
|
||||||
expectNoPodUpdate(t, ch)
|
|
||||||
}
|
|
||||||
|
@ -66,7 +66,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||||
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
|
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
|
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||||
@ -249,7 +248,7 @@ type DockerOptions struct {
|
|||||||
|
|
||||||
// makePodSourceConfig creates a config.PodConfig from the given
|
// makePodSourceConfig creates a config.PodConfig from the given
|
||||||
// KubeletConfiguration or returns an error.
|
// KubeletConfiguration or returns an error.
|
||||||
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
|
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
|
||||||
manifestURLHeader := make(http.Header)
|
manifestURLHeader := make(http.Header)
|
||||||
if len(kubeCfg.StaticPodURLHeader) > 0 {
|
if len(kubeCfg.StaticPodURLHeader) > 0 {
|
||||||
for k, v := range kubeCfg.StaticPodURLHeader {
|
for k, v := range kubeCfg.StaticPodURLHeader {
|
||||||
@ -274,20 +273,7 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
|
|||||||
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
|
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore from the checkpoint path
|
|
||||||
// NOTE: This MUST happen before creating the apiserver source
|
|
||||||
// below, or the checkpoint would override the source of truth.
|
|
||||||
|
|
||||||
var updatechannel chan<- interface{}
|
var updatechannel chan<- interface{}
|
||||||
if bootstrapCheckpointPath != "" {
|
|
||||||
klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
|
|
||||||
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
|
|
||||||
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if kubeDeps.KubeClient != nil {
|
if kubeDeps.KubeClient != nil {
|
||||||
klog.Infof("Watching apiserver")
|
klog.Infof("Watching apiserver")
|
||||||
if updatechannel == nil {
|
if updatechannel == nil {
|
||||||
@ -374,7 +360,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
keepTerminatedPodVolumes bool,
|
keepTerminatedPodVolumes bool,
|
||||||
nodeLabels map[string]string,
|
nodeLabels map[string]string,
|
||||||
seccompProfileRoot string,
|
seccompProfileRoot string,
|
||||||
bootstrapCheckpointPath string,
|
|
||||||
nodeStatusMaxImages int32) (*Kubelet, error) {
|
nodeStatusMaxImages int32) (*Kubelet, error) {
|
||||||
if rootDirectory == "" {
|
if rootDirectory == "" {
|
||||||
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
||||||
@ -397,7 +382,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
|
|
||||||
if kubeDeps.PodConfig == nil {
|
if kubeDeps.PodConfig == nil {
|
||||||
var err error
|
var err error
|
||||||
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
|
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -584,18 +569,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
|
|
||||||
klet.livenessManager = proberesults.NewManager()
|
klet.livenessManager = proberesults.NewManager()
|
||||||
klet.startupManager = proberesults.NewManager()
|
klet.startupManager = proberesults.NewManager()
|
||||||
|
|
||||||
klet.podCache = kubecontainer.NewCache()
|
klet.podCache = kubecontainer.NewCache()
|
||||||
var checkpointManager checkpointmanager.CheckpointManager
|
|
||||||
if bootstrapCheckpointPath != "" {
|
|
||||||
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
|
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
|
||||||
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
|
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
|
||||||
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager, checkpointManager)
|
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
|
||||||
|
|
||||||
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
|
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
|
||||||
|
|
||||||
@ -1853,28 +1831,15 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
|||||||
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
|
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
|
||||||
// DELETE is treated as a UPDATE because of graceful deletion.
|
// DELETE is treated as a UPDATE because of graceful deletion.
|
||||||
handler.HandlePodUpdates(u.Pods)
|
handler.HandlePodUpdates(u.Pods)
|
||||||
case kubetypes.RESTORE:
|
|
||||||
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
|
|
||||||
// These are pods restored from the checkpoint. Treat them as new
|
|
||||||
// pods.
|
|
||||||
handler.HandlePodAdditions(u.Pods)
|
|
||||||
case kubetypes.SET:
|
case kubetypes.SET:
|
||||||
// TODO: Do we want to support this?
|
// TODO: Do we want to support this?
|
||||||
klog.Errorf("Kubelet does not support snapshot update")
|
klog.Errorf("Kubelet does not support snapshot update")
|
||||||
|
default:
|
||||||
|
klog.Errorf("Invalid event type received: %d.", u.Op)
|
||||||
}
|
}
|
||||||
|
|
||||||
if u.Op != kubetypes.RESTORE {
|
kl.sourcesReady.AddSource(u.Source)
|
||||||
// If the update type is RESTORE, it means that the update is from
|
|
||||||
// the pod checkpoints and may be incomplete. Do not mark the
|
|
||||||
// source as ready.
|
|
||||||
|
|
||||||
// Mark the source ready after receiving at least one update from the
|
|
||||||
// source. Once all the sources are marked ready, various cleanup
|
|
||||||
// routines will start reclaiming resources. It is important that this
|
|
||||||
// takes place only after kubelet calls the update handler to process
|
|
||||||
// the update to ensure the internal pod cache is up-to-date.
|
|
||||||
kl.sourcesReady.AddSource(u.Source)
|
|
||||||
}
|
|
||||||
case e := <-plegCh:
|
case e := <-plegCh:
|
||||||
if isSyncPodWorthy(e) {
|
if isSyncPodWorthy(e) {
|
||||||
// PLEG event for a pod; sync it.
|
// PLEG event for a pod; sync it.
|
||||||
|
@ -221,7 +221,7 @@ func newTestKubeletWithImageList(
|
|||||||
kubelet.secretManager = secretManager
|
kubelet.secretManager = secretManager
|
||||||
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
|
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
|
||||||
kubelet.configMapManager = configMapManager
|
kubelet.configMapManager = configMapManager
|
||||||
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager, podtest.NewMockCheckpointManager())
|
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager)
|
||||||
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
|
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
|
||||||
|
|
||||||
kubelet.containerRuntime = fakeRuntime
|
kubelet.containerRuntime = fakeRuntime
|
||||||
|
@ -14,8 +14,6 @@ go_library(
|
|||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/pod",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/pod",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/checkpoint:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//pkg/kubelet/configmap:go_default_library",
|
"//pkg/kubelet/configmap:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/secret:go_default_library",
|
"//pkg/kubelet/secret:go_default_library",
|
||||||
|
@ -19,12 +19,8 @@ package pod
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
"k8s.io/kubernetes/pkg/kubelet/configmap"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||||
@ -120,20 +116,18 @@ type basicManager struct {
|
|||||||
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
|
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
|
||||||
|
|
||||||
// basicManager is keeping secretManager and configMapManager up-to-date.
|
// basicManager is keeping secretManager and configMapManager up-to-date.
|
||||||
secretManager secret.Manager
|
secretManager secret.Manager
|
||||||
configMapManager configmap.Manager
|
configMapManager configmap.Manager
|
||||||
checkpointManager checkpointmanager.CheckpointManager
|
|
||||||
|
|
||||||
// A mirror pod client to create/delete mirror pods.
|
// A mirror pod client to create/delete mirror pods.
|
||||||
MirrorClient
|
MirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBasicPodManager returns a functional Manager.
|
// NewBasicPodManager returns a functional Manager.
|
||||||
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager {
|
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager {
|
||||||
pm := &basicManager{}
|
pm := &basicManager{}
|
||||||
pm.secretManager = secretManager
|
pm.secretManager = secretManager
|
||||||
pm.configMapManager = configMapManager
|
pm.configMapManager = configMapManager
|
||||||
pm.checkpointManager = cpm
|
|
||||||
pm.MirrorClient = client
|
pm.MirrorClient = client
|
||||||
pm.SetPods(nil)
|
pm.SetPods(nil)
|
||||||
return pm
|
return pm
|
||||||
@ -161,11 +155,6 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
|
|||||||
pm.lock.Lock()
|
pm.lock.Lock()
|
||||||
defer pm.lock.Unlock()
|
defer pm.lock.Unlock()
|
||||||
pm.updatePodsInternal(pod)
|
pm.updatePodsInternal(pod)
|
||||||
if pm.checkpointManager != nil {
|
|
||||||
if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil {
|
|
||||||
klog.Errorf("Error writing checkpoint for pod: %v", pod.GetName())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func isPodInTerminatedState(pod *v1.Pod) bool {
|
func isPodInTerminatedState(pod *v1.Pod) bool {
|
||||||
@ -244,11 +233,6 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
|
|||||||
delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
|
delete(pm.podByUID, kubetypes.ResolvedPodUID(pod.UID))
|
||||||
delete(pm.podByFullName, podFullName)
|
delete(pm.podByFullName, podFullName)
|
||||||
}
|
}
|
||||||
if pm.checkpointManager != nil {
|
|
||||||
if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil {
|
|
||||||
klog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *basicManager) GetPods() []*v1.Pod {
|
func (pm *basicManager) GetPods() []*v1.Pod {
|
||||||
|
@ -34,7 +34,7 @@ func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
|
|||||||
fakeMirrorClient := podtest.NewFakeMirrorClient()
|
fakeMirrorClient := podtest.NewFakeMirrorClient()
|
||||||
secretManager := secret.NewFakeManager()
|
secretManager := secret.NewFakeManager()
|
||||||
configMapManager := configmap.NewFakeManager()
|
configMapManager := configmap.NewFakeManager()
|
||||||
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager, podtest.NewMockCheckpointManager()).(*basicManager)
|
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager)
|
||||||
return manager, fakeMirrorClient
|
return manager, fakeMirrorClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,8 +13,6 @@ go_library(
|
|||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing",
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/kubelet/checkpoint:go_default_library",
|
|
||||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@ -22,8 +22,6 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
cp "k8s.io/kubernetes/pkg/kubelet/checkpoint"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -85,37 +83,3 @@ func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) {
|
|||||||
defer fmc.mirrorPodLock.RUnlock()
|
defer fmc.mirrorPodLock.RUnlock()
|
||||||
return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
|
return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
|
||||||
}
|
}
|
||||||
|
|
||||||
type MockCheckpointManager struct {
|
|
||||||
checkpoint map[string]*cp.Data
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ckm *MockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
|
|
||||||
ckm.checkpoint[checkpointKey] = (checkpoint.(*cp.Data))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ckm *MockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
|
|
||||||
*(checkpoint.(*cp.Data)) = *(ckm.checkpoint[checkpointKey])
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ckm *MockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
|
|
||||||
_, ok := ckm.checkpoint[checkpointKey]
|
|
||||||
if ok {
|
|
||||||
delete(ckm.checkpoint, "moo")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ckm *MockCheckpointManager) ListCheckpoints() ([]string, error) {
|
|
||||||
var keys []string
|
|
||||||
for key := range ckm.checkpoint {
|
|
||||||
keys = append(keys, key)
|
|
||||||
}
|
|
||||||
return keys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMockCheckpointManager() checkpointmanager.CheckpointManager {
|
|
||||||
return &MockCheckpointManager{checkpoint: make(map[string]*cp.Data)}
|
|
||||||
}
|
|
||||||
|
@ -104,7 +104,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newTestManager() *manager {
|
func newTestManager() *manager {
|
||||||
podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil)
|
podManager := kubepod.NewBasicPodManager(nil, nil, nil)
|
||||||
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
|
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
|
||||||
podManager.AddPod(getTestPod())
|
podManager.AddPod(getTestPod())
|
||||||
m := NewManager(
|
m := NewManager(
|
||||||
|
@ -119,7 +119,7 @@ func TestDoProbe(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Clean up.
|
// Clean up.
|
||||||
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
|
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
|
||||||
resultsManager(m, probeType).Remove(testContainerID)
|
resultsManager(m, probeType).Remove(testContainerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ func TestRunOnce(t *testing.T) {
|
|||||||
fakeSecretManager := secret.NewFakeManager()
|
fakeSecretManager := secret.NewFakeManager()
|
||||||
fakeConfigMapManager := configmap.NewFakeManager()
|
fakeConfigMapManager := configmap.NewFakeManager()
|
||||||
podManager := kubepod.NewBasicPodManager(
|
podManager := kubepod.NewBasicPodManager(
|
||||||
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
|
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
|
||||||
fakeRuntime := &containertest.FakeRuntime{}
|
fakeRuntime := &containertest.FakeRuntime{}
|
||||||
basePath, err := utiltesting.MkTmpdir("kubelet")
|
basePath, err := utiltesting.MkTmpdir("kubelet")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -81,7 +81,7 @@ func (m *manager) testSyncBatch() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newTestManager(kubeClient clientset.Interface) *manager {
|
func newTestManager(kubeClient clientset.Interface) *manager {
|
||||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager(), podtest.NewMockCheckpointManager())
|
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
|
||||||
podManager.AddPod(getTestPod())
|
podManager.AddPod(getTestPod())
|
||||||
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
|
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
|
||||||
}
|
}
|
||||||
|
@ -48,8 +48,6 @@ const (
|
|||||||
// Pods with the given ids have unexpected status in this source,
|
// Pods with the given ids have unexpected status in this source,
|
||||||
// kubelet should reconcile status with this source
|
// kubelet should reconcile status with this source
|
||||||
RECONCILE
|
RECONCILE
|
||||||
// Pods with the given ids have been restored from a checkpoint.
|
|
||||||
RESTORE
|
|
||||||
|
|
||||||
// These constants identify the sources of pods
|
// These constants identify the sources of pods
|
||||||
// Updates from a file
|
// Updates from a file
|
||||||
|
@ -1008,7 +1008,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist
|
|||||||
fakeSecretManager := secret.NewFakeManager()
|
fakeSecretManager := secret.NewFakeManager()
|
||||||
fakeConfigMapManager := configmap.NewFakeManager()
|
fakeConfigMapManager := configmap.NewFakeManager()
|
||||||
fakePodManager := kubepod.NewBasicPodManager(
|
fakePodManager := kubepod.NewBasicPodManager(
|
||||||
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
|
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
|
||||||
|
|
||||||
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
|
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
|
||||||
fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr)
|
fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr)
|
||||||
|
@ -90,8 +90,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
|||||||
t.Fatalf("can't make a temp dir: %v", err)
|
t.Fatalf("can't make a temp dir: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
cpm := podtest.NewMockCheckpointManager()
|
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
|
||||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
|
|
||||||
|
|
||||||
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
|
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
|
||||||
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
|
||||||
@ -147,8 +146,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
|
|||||||
t.Fatalf("can't make a temp dir: %v", err)
|
t.Fatalf("can't make a temp dir: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
cpm := podtest.NewMockCheckpointManager()
|
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
|
||||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
|
|
||||||
|
|
||||||
node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
|
node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
|
||||||
claim.Status = v1.PersistentVolumeClaimStatus{
|
claim.Status = v1.PersistentVolumeClaimStatus{
|
||||||
@ -193,8 +191,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
|
|||||||
t.Fatalf("can't make a temp dir: %v", err)
|
t.Fatalf("can't make a temp dir: %v", err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
cpm := podtest.NewMockCheckpointManager()
|
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
|
||||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
|
|
||||||
|
|
||||||
node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
|
node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user