Make 'pod' package to use unified checkpointManager

Signed-off-by: vikaschoudhary16 <choudharyvikas16@gmail.com>
This commit is contained in:
vikaschoudhary16 2017-11-29 13:19:39 -05:00 committed by vikaschoudhary16
parent d62bd9ef65
commit cedbd93255
31 changed files with 176 additions and 125 deletions

View File

@ -168,6 +168,7 @@ pkg/kubelet/checkpointmanager/checksum
pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1
pkg/kubelet/client
pkg/kubelet/cm
pkg/kubelet/cm/devicemanager/checkpoint
pkg/kubelet/cm/util
pkg/kubelet/config
pkg/kubelet/configmap
@ -184,7 +185,6 @@ pkg/kubelet/dockershim/network/hostport
pkg/kubelet/dockershim/network/hostport/testing
pkg/kubelet/dockershim/network/kubenet
pkg/kubelet/dockershim/network/testing
pkg/kubelet/dockershim/testing
pkg/kubelet/events
pkg/kubelet/images
pkg/kubelet/kuberuntime

View File

@ -46,6 +46,7 @@ go_library(
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/configmap:go_default_library",

View File

@ -7,9 +7,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/dchest/safefile:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
],
@ -21,6 +20,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],

View File

@ -17,20 +17,15 @@ limitations under the License.
package checkpoint
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"github.com/dchest/safefile"
"github.com/ghodss/yaml"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
const (
@ -39,54 +34,44 @@ const (
podPrefix = "Pod"
)
// Manager is the interface used to manage checkpoints
// which involves writing resources to disk to recover
// during restart or failure scenarios.
// https://github.com/kubernetes/community/pull/1241/files
type Manager interface {
// LoadPods will load checkpointed Pods from disk
LoadPods() ([]*v1.Pod, error)
// WritePod will serialize a Pod to disk
WritePod(pod *v1.Pod) error
// Deletes the checkpoint of the given pod from disk
DeletePod(pod *v1.Pod) error
type PodCheckpoint interface {
checkpointmanager.Checkpoint
GetPod() *v1.Pod
}
var instance Manager
var mutex = &sync.Mutex{}
// fileCheckPointManager - is a checkpointer that writes contents to disk
// The type information of the resource objects are encoded in the name
type fileCheckPointManager struct {
path string
// Data to be stored as checkpoint
type Data struct {
Pod *v1.Pod
Checksum checksum.Checksum
}
// NewCheckpointManager will create a Manager that points to the following path
func NewCheckpointManager(path string) Manager {
// NOTE: This is a precaution; current implementation should not run
// multiple checkpoint managers.
mutex.Lock()
defer mutex.Unlock()
instance = &fileCheckPointManager{path: path}
return instance
// NewPodCheckpoint returns new pod checkpoint
func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
return &Data{Pod: pod}
}
// GetInstance will return the current Manager, there should be only one.
func GetInstance() Manager {
mutex.Lock()
defer mutex.Unlock()
return instance
// MarshalCheckpoint returns marshalled data
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Pod)
return json.Marshal(*cp)
}
// loadPod will load Pod Checkpoint yaml file.
func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) {
return util.LoadPodFromFile(file)
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Pod)
}
func (cp *Data) GetPod() *v1.Pod {
return cp.Pod
}
// checkAnnotations will validate the checkpoint annotations exist on the Pod
func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool {
func checkAnnotations(pod *v1.Pod) bool {
if podAnnotations := pod.GetAnnotations(); podAnnotations != nil {
if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" {
return true
@ -95,57 +80,49 @@ func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool {
return false
}
// getPodPath returns the full qualified path for the pod checkpoint
func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string {
return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID())
//getPodKey returns the full qualified path for the pod checkpoint
func getPodKey(pod *v1.Pod) string {
return fmt.Sprintf("Pod%v%v.yaml", delimiter, pod.GetUID())
}
// LoadPods Loads All Checkpoints from disk
func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) {
checkpoints := make([]*v1.Pod, 0)
files, err := ioutil.ReadDir(fcp.path)
func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
pods := make([]*v1.Pod, 0)
var err error
checkpointKeys := []string{}
checkpointKeys, err = cpm.ListCheckpoints()
if err != nil {
return nil, err
glog.Errorf("Failed to list checkpoints: %v", err)
}
for _, f := range files {
// get just the filename
_, fname := filepath.Split(f.Name())
// Get just the Resource from "Resource_Name"
fnfields := strings.Split(fname, delimiter)
switch fnfields[0] {
case podPrefix:
pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name()))
if err != nil {
return nil, err
}
checkpoints = append(checkpoints, pod)
default:
glog.Warningf("Unsupported checkpoint file detected %v", f)
for _, key := range checkpointKeys {
checkpoint := NewPodCheckpoint(nil)
err := cpm.GetCheckpoint(key, checkpoint)
if err != nil {
glog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
continue
}
pods = append(pods, checkpoint.GetPod())
}
return checkpoints, nil
return pods, nil
}
// Writes a checkpoint to a file on disk if annotation is present
func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error {
// WritePod a checkpoint to a file on disk if annotation is present
func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
var err error
if fcp.checkAnnotations(pod) {
if blob, err := yaml.Marshal(pod); err == nil {
err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644)
}
if checkAnnotations(pod) {
data := NewPodCheckpoint(pod)
err = cpm.CreateCheckpoint(getPodKey(pod), data)
} else {
// This is to handle an edge where a pod update could remove
// an annotation and the checkpoint should then be removed.
err = fcp.DeletePod(pod)
err = cpm.RemoveCheckpoint(getPodKey(pod))
}
return err
}
// Deletes a checkpoint from disk if present
func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error {
podPath := fcp.getPodPath(pod)
if err := os.Remove(podPath); !os.IsNotExist(err) {
return err
}
return nil
// DeletePod deletes a checkpoint from disk if present
func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
return cpm.RemoveCheckpoint(getPodKey(pod))
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
)
// TestWriteLoadDeletePods validates all combinations of write, load, and delete
@ -70,15 +71,18 @@ func TestWriteLoadDeletePods(t *testing.T) {
}
defer os.RemoveAll(dir)
cp := NewCheckpointManager(dir)
cpm, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
t.Errorf("Failed to initialize checkpoint manager error=%v", err)
}
for _, p := range testPods {
// Write pods should always pass unless there is an fs error
if err := cp.WritePod(p.pod); err != nil {
if err := WritePod(cpm, p.pod); err != nil {
t.Errorf("Failed to Write Pod: %v", err)
}
}
// verify the correct written files are loaded from disk
pods, err := cp.LoadPods()
pods, err := LoadPods(cpm)
if err != nil {
t.Errorf("Failed to Load Pods: %v", err)
}
@ -104,7 +108,7 @@ func TestWriteLoadDeletePods(t *testing.T) {
} else if lpod != nil {
t.Errorf("Got unexpected result for %v, should not have been loaded", pname)
}
err = cp.DeletePod(p.pod)
err = DeletePod(cpm, p.pod)
if err != nil {
t.Errorf("Failed to delete pod %v", pname)
}

View File

@ -22,7 +22,7 @@ go_test(
srcs = ["checkpoint_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/testing:go_default_library",
"//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@ -1,8 +1,9 @@
## DISCLAIMER
Sig-Node community has reached a general consensus, as a best practice, to
- Sig-Node community has reached a general consensus, as a best practice, to
avoid introducing any new checkpointing support. We reached this understanding
after struggling with some hard-to-debug issues in the production environments
caused by the checkpointing.
- Any changes to the checkpointed data structure would be considered incompatible and a component should add its own handling if it needs to ensure backward compatibility of reading old-format checkpoint files.
## Introduction
This folder contains a framework & primitives, Checkpointing Manager, which is

View File

@ -18,6 +18,7 @@ package checkpointmanager
import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
@ -48,8 +49,10 @@ type CheckpointManager interface {
type impl struct {
path string
store utilstore.Store
mutex sync.Mutex
}
// NewCheckpointManager returns a new instance of a checkpoint manager
func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{})
if err != nil {
@ -88,6 +91,7 @@ func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint)
return err
}
// RemoveCheckpoint will not return error if checkpoint does not exist.
func (manager *impl) RemoveCheckpoint(checkpointKey string) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()

View File

@ -18,12 +18,11 @@ package checkpointmanager
import (
"encoding/json"
"hash/fnv"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1"
)
@ -50,7 +49,7 @@ type CheckpointDataV2 struct {
type protocol string
// portMapping is the port mapping configurations of a sandbox.
type portMapping struct {
type PortMapping struct {
// protocol of the port mapping.
Protocol *protocol
// Port number within the container.
@ -153,7 +152,7 @@ func TestCheckpointManager(t *testing.T) {
port443 := int32(443)
proto := protocol("tcp")
portMappings := []*portMapping{
portMappings := []*PortMapping{
{
&proto,
&port80,

View File

@ -18,5 +18,8 @@ package errors
import "fmt"
var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")
var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.")
// ErrCorruptCheckpoint error is reported when checksum does not match
var ErrCorruptCheckpoint = fmt.Errorf("checkpoint is corrupted")
// ErrCheckpointNotFound is reported when checkpoint is not found for a given key
var ErrCheckpointNotFound = fmt.Errorf("checkpoint is not found")

View File

@ -27,10 +27,12 @@ type MemStore struct {
sync.Mutex
}
// NewMemStore returns an instance of MemStore
func NewMemStore() *MemStore {
return &MemStore{mem: make(map[string][]byte)}
}
// Write writes the data to the store
func (mstore *MemStore) Write(key string, data []byte) error {
mstore.Lock()
defer mstore.Unlock()
@ -38,6 +40,7 @@ func (mstore *MemStore) Write(key string, data []byte) error {
return nil
}
// Read returns data read from store
func (mstore *MemStore) Read(key string) ([]byte, error) {
mstore.Lock()
defer mstore.Unlock()
@ -48,6 +51,7 @@ func (mstore *MemStore) Read(key string) ([]byte, error) {
return data, nil
}
// Delete deletes data from the store
func (mstore *MemStore) Delete(key string) error {
mstore.Lock()
defer mstore.Unlock()
@ -55,6 +59,7 @@ func (mstore *MemStore) Delete(key string) error {
return nil
}
// List returns all the keys from the store
func (mstore *MemStore) List() ([]string, error) {
mstore.Lock()
defer mstore.Unlock()

View File

@ -17,7 +17,6 @@ limitations under the License.
package devicemanager
import (
"flag"
"fmt"
"io/ioutil"
"os"
@ -548,7 +547,6 @@ type TestResource struct {
}
func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
@ -753,7 +751,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
as.Nil(err)
testManager := &ManagerImpl{
callback: monitorCallback,
allDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
podDevices: make(podDevices),
checkpointManager: ckm,

View File

@ -144,7 +144,12 @@ func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
continue
}
data = append(data, checkpoint.PodDevicesEntry{podUID, conName, resource, devIds, allocResp})
data = append(data, checkpoint.PodDevicesEntry{
PodUID: podUID,
ContainerName: conName,
ResourceName: resource,
DeviceIDs: devIds,
AllocResp: allocResp})
}
}
}

View File

@ -58,6 +58,7 @@ go_library(
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/types:go_default_library",

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -64,7 +65,7 @@ type PodConfig struct {
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpoint.Manager
checkpointManager checkpointmanager.CheckpointManager
}
// NewPodConfig creates an object that can merge many configuration sources into a stream
@ -114,10 +115,12 @@ func (c *PodConfig) Sync() {
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
var err error
if c.checkpointManager == nil {
c.checkpointManager = checkpoint.NewCheckpointManager(path)
pods, err := c.checkpointManager.LoadPods()
if err == nil {
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
if err != nil {
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err == nil {
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
}
}
}
return err

View File

@ -155,7 +155,6 @@ go_test(
"//pkg/kubelet/dockershim/libdocker:go_default_library",
"//pkg/kubelet/dockershim/network:go_default_library",
"//pkg/kubelet/dockershim/network/testing:go_default_library",
"//pkg/kubelet/dockershim/testing:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/cache:go_default_library",
"//pkg/security/apparmor:go_default_library",

View File

@ -68,7 +68,7 @@ func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) {
var keys []string
for key, _ := range ckm.checkpoint {
for key := range ckm.checkpoint {
keys = append(keys, key)
}
return keys, nil

View File

@ -61,6 +61,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
@ -553,8 +554,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified

View File

@ -215,7 +215,7 @@ func newTestKubeletWithImageList(
kubelet.secretManager = secretManager
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager)
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager, podtest.NewMockCheckpointManager())
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
kubelet.containerRuntime = fakeRuntime

View File

@ -52,7 +52,7 @@ func TestGetVolumeExec(t *testing.T) {
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
podManager.SetPods(pods)
// Prepare fake /var/lib/kubelet

View File

@ -15,6 +15,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/pod",
deps = [
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/configmap:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/secret:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/secret"
@ -121,18 +122,18 @@ type basicManager struct {
// basicManager is keeping secretManager and configMapManager up-to-date.
secretManager secret.Manager
configMapManager configmap.Manager
checkpointManager checkpoint.Manager
checkpointManager checkpointmanager.CheckpointManager
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
// NewBasicPodManager returns a functional Manager.
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager {
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager {
pm := &basicManager{}
pm.secretManager = secretManager
pm.configMapManager = configMapManager
pm.checkpointManager = checkpoint.GetInstance()
pm.checkpointManager = cpm
pm.MirrorClient = client
pm.SetPods(nil)
return pm
@ -161,7 +162,7 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
defer pm.lock.Unlock()
pm.updatePodsInternal(pod)
if pm.checkpointManager != nil {
if err := pm.checkpointManager.WritePod(pod); err != nil {
if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil {
glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName())
}
}
@ -224,7 +225,7 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
delete(pm.podByFullName, podFullName)
}
if pm.checkpointManager != nil {
if err := pm.checkpointManager.DeletePod(pod); err != nil {
if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil {
glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName())
}
}

View File

@ -34,7 +34,7 @@ func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
fakeMirrorClient := podtest.NewFakeMirrorClient()
secretManager := secret.NewFakeManager()
configMapManager := configmap.NewFakeManager()
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager)
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager, podtest.NewMockCheckpointManager()).(*basicManager)
return manager, fakeMirrorClient
}

View File

@ -13,6 +13,8 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing",
deps = [
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",

View File

@ -21,6 +21,8 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
cp "k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -81,3 +83,37 @@ func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) {
defer fmc.mirrorPodLock.RUnlock()
return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
}
type MockCheckpointManager struct {
checkpoint map[string]*cp.Data
}
func (ckm *MockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
ckm.checkpoint[checkpointKey] = (checkpoint.(*cp.Data))
return nil
}
func (ckm *MockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
*(checkpoint.(*cp.Data)) = *(ckm.checkpoint[checkpointKey])
return nil
}
func (ckm *MockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
_, ok := ckm.checkpoint[checkpointKey]
if ok {
delete(ckm.checkpoint, "moo")
}
return nil
}
func (ckm *MockCheckpointManager) ListCheckpoints() ([]string, error) {
var keys []string
for key := range ckm.checkpoint {
keys = append(keys, key)
}
return keys, nil
}
func NewMockCheckpointManager() checkpointmanager.CheckpointManager {
return &MockCheckpointManager{checkpoint: make(map[string]*cp.Data)}
}

View File

@ -99,7 +99,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
func newTestManager() *manager {
refManager := kubecontainer.NewRefManager()
refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings.
podManager := kubepod.NewBasicPodManager(nil, nil, nil)
podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil)
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
m := NewManager(

View File

@ -118,7 +118,7 @@ func TestDoProbe(t *testing.T) {
}
// Clean up.
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -63,7 +63,7 @@ func TestRunOnce(t *testing.T) {
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
fakeRuntime := &containertest.FakeRuntime{}
basePath, err := utiltesting.MkTmpdir("kubelet")
if err != nil {

View File

@ -75,7 +75,7 @@ func (m *manager) testSyncBatch() {
}
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager())
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager(), podtest.NewMockCheckpointManager())
podManager.AddPod(getTestPod())
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
}

View File

@ -523,7 +523,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
fakePodManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager)
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr)

View File

@ -55,7 +55,8 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
node, pod, pv, claim := createObjects()
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
@ -97,7 +98,8 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
node, pod, pv, claim := createObjects()
claim.Status = v1.PersistentVolumeClaimStatus{
@ -135,7 +137,8 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager())
cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
node, pod, _, claim := createObjects()