Merge pull request #56040 from vikaschoudhary16/ckmngr

Automatic merge from submit-queue (batch tested with PRs 56040, 62627). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Node-level Checkpointing manager: Migrate dockershim and device plugin manager checkpointing  

**What this PR does / why we need it**:
This PR abstracts checkpoint manager at kubelet level. Currently,  `dockershim`, `deviceplugin` have their own native checkpointing primitives. And most recently `cpumanager` also added package native checkpointing primitives. This adds to the redundancy at implementation level. Also degrades code readability and consistency.

To help this:

1.  Checkpointing interface is being abstracted at kubelet level as `checkpointmanager` package.
2.  `dockershim` and `deviceplugin` packages are modified to use `checkpointmanager` instead native checkpointing.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #



```release-note
None
```
cc @jeremyeder @vishh @derekwaynecarr @sjenning @yujuhong @dchen1107 @RenaudWasTaken @ConnorDoyle @RenaudWasTaken @jiayingz @mindprince @timstclair 
/sig node
This commit is contained in:
Kubernetes Submit Queue 2018-04-16 15:02:08 -07:00 committed by GitHub
commit 83ee626561
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1092 additions and 395 deletions

View File

@ -163,8 +163,12 @@ pkg/kubelet/apis/kubeletconfig
pkg/kubelet/apis/kubeletconfig/v1beta1 pkg/kubelet/apis/kubeletconfig/v1beta1
pkg/kubelet/cadvisor pkg/kubelet/cadvisor
pkg/kubelet/cadvisor/testing pkg/kubelet/cadvisor/testing
pkg/kubelet/checkpoint
pkg/kubelet/checkpointmanager/checksum
pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1
pkg/kubelet/client pkg/kubelet/client
pkg/kubelet/cm pkg/kubelet/cm
pkg/kubelet/cm/devicemanager/checkpoint
pkg/kubelet/cm/util pkg/kubelet/cm/util
pkg/kubelet/config pkg/kubelet/config
pkg/kubelet/configmap pkg/kubelet/configmap
@ -181,7 +185,6 @@ pkg/kubelet/dockershim/network/hostport
pkg/kubelet/dockershim/network/hostport/testing pkg/kubelet/dockershim/network/hostport/testing
pkg/kubelet/dockershim/network/kubenet pkg/kubelet/dockershim/network/kubenet
pkg/kubelet/dockershim/network/testing pkg/kubelet/dockershim/network/testing
pkg/kubelet/dockershim/testing
pkg/kubelet/events pkg/kubelet/events
pkg/kubelet/images pkg/kubelet/images
pkg/kubelet/kuberuntime pkg/kubelet/kuberuntime

View File

@ -46,6 +46,7 @@ go_library(
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library", "//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/configmap:go_default_library",
@ -246,6 +247,7 @@ filegroup(
"//pkg/kubelet/cadvisor:all-srcs", "//pkg/kubelet/cadvisor:all-srcs",
"//pkg/kubelet/certificate:all-srcs", "//pkg/kubelet/certificate:all-srcs",
"//pkg/kubelet/checkpoint:all-srcs", "//pkg/kubelet/checkpoint:all-srcs",
"//pkg/kubelet/checkpointmanager:all-srcs",
"//pkg/kubelet/client:all-srcs", "//pkg/kubelet/client:all-srcs",
"//pkg/kubelet/cm:all-srcs", "//pkg/kubelet/cm:all-srcs",
"//pkg/kubelet/config:all-srcs", "//pkg/kubelet/config:all-srcs",

View File

@ -7,9 +7,8 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//vendor/github.com/dchest/safefile:go_default_library", "//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//vendor/github.com/ghodss/yaml:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
], ],
@ -21,6 +20,7 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
], ],

View File

@ -17,20 +17,15 @@ limitations under the License.
package checkpoint package checkpoint
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"github.com/dchest/safefile"
"github.com/ghodss/yaml"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
) )
const ( const (
@ -39,54 +34,44 @@ const (
podPrefix = "Pod" podPrefix = "Pod"
) )
// Manager is the interface used to manage checkpoints type PodCheckpoint interface {
// which involves writing resources to disk to recover checkpointmanager.Checkpoint
// during restart or failure scenarios. GetPod() *v1.Pod
// https://github.com/kubernetes/community/pull/1241/files
type Manager interface {
// LoadPods will load checkpointed Pods from disk
LoadPods() ([]*v1.Pod, error)
// WritePod will serialize a Pod to disk
WritePod(pod *v1.Pod) error
// Deletes the checkpoint of the given pod from disk
DeletePod(pod *v1.Pod) error
} }
var instance Manager // Data to be stored as checkpoint
var mutex = &sync.Mutex{} type Data struct {
Pod *v1.Pod
// fileCheckPointManager - is a checkpointer that writes contents to disk Checksum checksum.Checksum
// The type information of the resource objects are encoded in the name
type fileCheckPointManager struct {
path string
} }
// NewCheckpointManager will create a Manager that points to the following path // NewPodCheckpoint returns new pod checkpoint
func NewCheckpointManager(path string) Manager { func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint {
// NOTE: This is a precaution; current implementation should not run return &Data{Pod: pod}
// multiple checkpoint managers.
mutex.Lock()
defer mutex.Unlock()
instance = &fileCheckPointManager{path: path}
return instance
} }
// GetInstance will return the current Manager, there should be only one. // MarshalCheckpoint returns marshalled data
func GetInstance() Manager { func (cp *Data) MarshalCheckpoint() ([]byte, error) {
mutex.Lock() cp.Checksum = checksum.New(*cp.Pod)
defer mutex.Unlock() return json.Marshal(*cp)
return instance
} }
// loadPod will load Pod Checkpoint yaml file. // UnmarshalCheckpoint returns unmarshalled data
func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) { func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return util.LoadPodFromFile(file) return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Pod)
}
func (cp *Data) GetPod() *v1.Pod {
return cp.Pod
} }
// checkAnnotations will validate the checkpoint annotations exist on the Pod // checkAnnotations will validate the checkpoint annotations exist on the Pod
func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool { func checkAnnotations(pod *v1.Pod) bool {
if podAnnotations := pod.GetAnnotations(); podAnnotations != nil { if podAnnotations := pod.GetAnnotations(); podAnnotations != nil {
if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" { if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" {
return true return true
@ -95,57 +80,49 @@ func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool {
return false return false
} }
// getPodPath returns the full qualified path for the pod checkpoint //getPodKey returns the full qualified path for the pod checkpoint
func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string { func getPodKey(pod *v1.Pod) string {
return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID()) return fmt.Sprintf("Pod%v%v.yaml", delimiter, pod.GetUID())
} }
// LoadPods Loads All Checkpoints from disk // LoadPods Loads All Checkpoints from disk
func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) { func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) {
checkpoints := make([]*v1.Pod, 0) pods := make([]*v1.Pod, 0)
files, err := ioutil.ReadDir(fcp.path)
var err error
checkpointKeys := []string{}
checkpointKeys, err = cpm.ListCheckpoints()
if err != nil { if err != nil {
return nil, err glog.Errorf("Failed to list checkpoints: %v", err)
} }
for _, f := range files {
// get just the filename for _, key := range checkpointKeys {
_, fname := filepath.Split(f.Name()) checkpoint := NewPodCheckpoint(nil)
// Get just the Resource from "Resource_Name" err := cpm.GetCheckpoint(key, checkpoint)
fnfields := strings.Split(fname, delimiter)
switch fnfields[0] {
case podPrefix:
pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name()))
if err != nil { if err != nil {
return nil, err glog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err)
continue
} }
checkpoints = append(checkpoints, pod) pods = append(pods, checkpoint.GetPod())
default:
glog.Warningf("Unsupported checkpoint file detected %v", f)
} }
} return pods, nil
return checkpoints, nil
} }
// Writes a checkpoint to a file on disk if annotation is present // WritePod a checkpoint to a file on disk if annotation is present
func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error { func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
var err error var err error
if fcp.checkAnnotations(pod) { if checkAnnotations(pod) {
if blob, err := yaml.Marshal(pod); err == nil { data := NewPodCheckpoint(pod)
err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644) err = cpm.CreateCheckpoint(getPodKey(pod), data)
}
} else { } else {
// This is to handle an edge where a pod update could remove // This is to handle an edge where a pod update could remove
// an annotation and the checkpoint should then be removed. // an annotation and the checkpoint should then be removed.
err = fcp.DeletePod(pod) err = cpm.RemoveCheckpoint(getPodKey(pod))
} }
return err return err
} }
// Deletes a checkpoint from disk if present // DeletePod deletes a checkpoint from disk if present
func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error { func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error {
podPath := fcp.getPodPath(pod) return cpm.RemoveCheckpoint(getPodKey(pod))
if err := os.Remove(podPath); !os.IsNotExist(err) {
return err
}
return nil
} }

View File

@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
) )
// TestWriteLoadDeletePods validates all combinations of write, load, and delete // TestWriteLoadDeletePods validates all combinations of write, load, and delete
@ -70,15 +71,18 @@ func TestWriteLoadDeletePods(t *testing.T) {
} }
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
cp := NewCheckpointManager(dir) cpm, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
t.Errorf("Failed to initialize checkpoint manager error=%v", err)
}
for _, p := range testPods { for _, p := range testPods {
// Write pods should always pass unless there is an fs error // Write pods should always pass unless there is an fs error
if err := cp.WritePod(p.pod); err != nil { if err := WritePod(cpm, p.pod); err != nil {
t.Errorf("Failed to Write Pod: %v", err) t.Errorf("Failed to Write Pod: %v", err)
} }
} }
// verify the correct written files are loaded from disk // verify the correct written files are loaded from disk
pods, err := cp.LoadPods() pods, err := LoadPods(cpm)
if err != nil { if err != nil {
t.Errorf("Failed to Load Pods: %v", err) t.Errorf("Failed to Load Pods: %v", err)
} }
@ -104,7 +108,7 @@ func TestWriteLoadDeletePods(t *testing.T) {
} else if lpod != nil { } else if lpod != nil {
t.Errorf("Got unexpected result for %v, should not have been loaded", pname) t.Errorf("Got unexpected result for %v, should not have been loaded", pname)
} }
err = cp.DeletePod(p.pod) err = DeletePod(cpm, p.pod)
if err != nil { if err != nil {
t.Errorf("Failed to delete pod %v", pname) t.Errorf("Failed to delete pod %v", pname)
} }

View File

@ -0,0 +1,48 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["checkpoint_manager.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager",
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["checkpoint_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/checkpointmanager/testing:go_default_library",
"//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/checkpointmanager/checksum:all-srcs",
"//pkg/kubelet/checkpointmanager/errors:all-srcs",
"//pkg/kubelet/checkpointmanager/testing:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,25 @@
## DISCLAIMER
- Sig-Node community has reached a general consensus, as a best practice, to
avoid introducing any new checkpointing support. We reached this understanding
after struggling with some hard-to-debug issues in the production environments
caused by the checkpointing.
- Any changes to the checkpointed data structure would be considered incompatible and a component should add its own handling if it needs to ensure backward compatibility of reading old-format checkpoint files.
## Introduction
This folder contains a framework & primitives, Checkpointing Manager, which is
used by several other Kubelet submodules, `dockershim`, `devicemanager`, `pods`
and `cpumanager`, to implement checkpointing at each submodule level. As already
explained in above `Disclaimer` section, think twice before introducing any further
checkpointing in Kubelet. If still checkpointing is required, then this folder
provides the common APIs and the framework for implementing checkpointing.
Using same APIs across all the submodules will help maintaining consistency at
Kubelet level.
Below is the history of checkpointing support in Kubelet.
| Package | First checkpointing support merged on | PR link |
| ------- | --------------------------------------| ------- |
|kubelet/dockershim | Feb 3, 2017 | [[CRI] Implement Dockershim Checkpoint](https://github.com/kubernetes/kubernetes/pull/39903)
|devicemanager| Sep 6, 2017 | [Deviceplugin checkpoint](https://github.com/kubernetes/kubernetes/pull/51744)
| kubelet/pod | Nov 22, 2017 | [Initial basic bootstrap-checkpoint support](https://github.com/kubernetes/kubernetes/pull/50984)
|cpumanager| Oct 27, 2017 |[Add file backed state to cpu manager ](https://github.com/kubernetes/kubernetes/pull/54408)

View File

@ -0,0 +1,110 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpointmanager
import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// Checkpoint provides the process checkpoint data
type Checkpoint interface {
MarshalCheckpoint() ([]byte, error)
UnmarshalCheckpoint(blob []byte) error
VerifyChecksum() error
}
// CheckpointManager provides the interface to manage checkpoint
type CheckpointManager interface {
// CreateCheckpoint persists checkpoint in CheckpointStore. checkpointKey is the key for utilstore to locate checkpoint.
// For file backed utilstore, checkpointKey is the file name to write the checkpoint data.
CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error
// GetCheckpoint retrieves checkpoint from CheckpointStore.
GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error
// WARNING: RemoveCheckpoint will not return error if checkpoint does not exist.
RemoveCheckpoint(checkpointKey string) error
// ListCheckpoint returns the list of existing checkpoints.
ListCheckpoints() ([]string, error)
}
// impl is an implementation of CheckpointManager. It persists checkpoint in CheckpointStore
type impl struct {
path string
store utilstore.Store
mutex sync.Mutex
}
// NewCheckpointManager returns a new instance of a checkpoint manager
func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) {
fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{})
if err != nil {
return nil, err
}
return &impl{path: checkpointDir, store: fstore}, nil
}
// CreateCheckpoint persists checkpoint in CheckpointStore.
func (manager *impl) CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
blob, err := checkpoint.MarshalCheckpoint()
if err != nil {
return err
}
return manager.store.Write(checkpointKey, blob)
}
// GetCheckpoint retrieves checkpoint from CheckpointStore.
func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
blob, err := manager.store.Read(checkpointKey)
if err != nil {
if err == utilstore.ErrKeyNotFound {
return errors.ErrCheckpointNotFound
}
return err
}
err = checkpoint.UnmarshalCheckpoint(blob)
if err == nil {
err = checkpoint.VerifyChecksum()
}
return err
}
// RemoveCheckpoint will not return error if checkpoint does not exist.
func (manager *impl) RemoveCheckpoint(checkpointKey string) error {
manager.mutex.Lock()
defer manager.mutex.Unlock()
return manager.store.Delete(checkpointKey)
}
// ListCheckpoints returns the list of existing checkpoints.
func (manager *impl) ListCheckpoints() ([]string, error) {
manager.mutex.Lock()
defer manager.mutex.Unlock()
keys, err := manager.store.List()
if err != nil {
return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err)
}
return keys, nil
}

View File

@ -0,0 +1,245 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpointmanager
import (
"encoding/json"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1"
)
var testStore *utilstore.MemStore
type FakeCheckpoint interface {
Checkpoint
GetData() ([]*PortMapping, bool)
}
// Data contains all types of data that can be stored in the checkpoint.
type Data struct {
PortMappings []*PortMapping `json:"port_mappings,omitempty"`
HostNetwork bool `json:"host_network,omitempty"`
}
type CheckpointDataV2 struct {
PortMappings []*PortMapping `json:"port_mappings,omitempty"`
HostNetwork bool `json:"host_network,omitempty"`
V2Field string `json:"v2field"`
}
type protocol string
// portMapping is the port mapping configurations of a sandbox.
type PortMapping struct {
// protocol of the port mapping.
Protocol *protocol
// Port number within the container.
ContainerPort *int32
// Port number on the host.
HostPort *int32
}
// CheckpointData is a sample example structure to be used in test cases for checkpointing
type CheckpointData struct {
Version string
Name string
Data *Data
Checksum checksum.Checksum
}
func newFakeCheckpointV1(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint {
return &CheckpointData{
Version: "v1",
Name: name,
Data: &Data{
PortMappings: portMappings,
HostNetwork: hostNetwork,
},
}
}
func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *CheckpointData) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}
func (cp *CheckpointData) GetData() ([]*PortMapping, bool) {
return cp.Data.PortMappings, cp.Data.HostNetwork
}
type checkpointDataV2 struct {
Version string
Name string
Data *CheckpointDataV2
Checksum checksum.Checksum
}
func newFakeCheckpointV2(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint {
return &checkpointDataV2{
Version: "v2",
Name: name,
Data: &CheckpointDataV2{
PortMappings: portMappings,
HostNetwork: hostNetwork,
},
}
}
func newFakeCheckpointRemoteV1(name string, portMappings []*v1.PortMapping, hostNetwork bool) Checkpoint {
return &v1.CheckpointData{
Version: "v1",
Name: name,
Data: &v1.Data{
PortMappings: portMappings,
HostNetwork: hostNetwork,
},
}
}
func (cp *checkpointDataV2) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *checkpointDataV2) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *checkpointDataV2) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}
func (cp *checkpointDataV2) GetData() ([]*PortMapping, bool) {
return cp.Data.PortMappings, cp.Data.HostNetwork
}
func newTestCheckpointManager() CheckpointManager {
return &impl{store: testStore}
}
func TestCheckpointManager(t *testing.T) {
var err error
testStore = utilstore.NewMemStore()
manager := newTestCheckpointManager()
port80 := int32(80)
port443 := int32(443)
proto := protocol("tcp")
portMappings := []*PortMapping{
{
&proto,
&port80,
&port80,
},
{
&proto,
&port443,
&port443,
},
}
checkpoint1 := newFakeCheckpointV1("check1", portMappings, true)
checkpoints := []struct {
checkpointKey string
checkpoint FakeCheckpoint
expectHostNetwork bool
}{
{
"key1",
checkpoint1,
true,
},
{
"key2",
newFakeCheckpointV1("check2", nil, false),
false,
},
}
for _, tc := range checkpoints {
// Test CreateCheckpoints
err = manager.CreateCheckpoint(tc.checkpointKey, tc.checkpoint)
assert.NoError(t, err)
// Test GetCheckpoints
checkpointOut := newFakeCheckpointV1("", nil, false)
err := manager.GetCheckpoint(tc.checkpointKey, checkpointOut)
assert.NoError(t, err)
actualPortMappings, actualHostNetwork := checkpointOut.GetData()
expPortMappings, expHostNetwork := tc.checkpoint.GetData()
assert.Equal(t, actualPortMappings, expPortMappings)
assert.Equal(t, actualHostNetwork, expHostNetwork)
}
// Test it fails if tried to read V1 structure into V2, a different structure from the structure which is checkpointed
checkpointV2 := newFakeCheckpointV2("", nil, false)
err = manager.GetCheckpoint("key1", checkpointV2)
assert.EqualError(t, err, "checkpoint is corrupted")
// Test it fails if tried to read V1 structure into the same structure but defined in another package
checkpointRemoteV1 := newFakeCheckpointRemoteV1("", nil, false)
err = manager.GetCheckpoint("key1", checkpointRemoteV1)
assert.EqualError(t, err, "checkpoint is corrupted")
// Test it works if tried to read V1 structure using into a new V1 structure
checkpointV1 := newFakeCheckpointV1("", nil, false)
err = manager.GetCheckpoint("key1", checkpointV1)
assert.NoError(t, err)
// Test corrupt checksum case
checkpointOut := newFakeCheckpointV1("", nil, false)
blob, err := checkpointOut.MarshalCheckpoint()
assert.NoError(t, err)
testStore.Write("key1", blob)
err = manager.GetCheckpoint("key1", checkpoint1)
assert.EqualError(t, err, "checkpoint is corrupted")
// Test ListCheckpoints
keys, err := manager.ListCheckpoints()
assert.NoError(t, err)
sort.Strings(keys)
assert.Equal(t, keys, []string{"key1", "key2"})
// Test RemoveCheckpoints
err = manager.RemoveCheckpoint("key1")
assert.NoError(t, err)
// Test Remove Nonexisted Checkpoints
err = manager.RemoveCheckpoint("key1")
assert.NoError(t, err)
// Test ListCheckpoints
keys, err = manager.ListCheckpoints()
assert.NoError(t, err)
assert.Equal(t, keys, []string{"key2"})
// Test Get NonExisted Checkpoint
checkpointNE := newFakeCheckpointV1("NE", nil, false)
err = manager.GetCheckpoint("key1", checkpointNE)
assert.Error(t, err)
}

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["checksum.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/util/hash:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,46 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checksum
import (
"hash/fnv"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
// Data to be stored as checkpoint
type Checksum uint64
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cs Checksum) Verify(data interface{}) error {
if cs != New(data) {
return errors.ErrCorruptCheckpoint
}
return nil
}
func New(data interface{}) Checksum {
return Checksum(getChecksum(data))
}
// Get returns calculated checksum of checkpoint data
func getChecksum(data interface{}) uint64 {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, data)
return uint64(hash.Sum32())
}

View File

@ -7,8 +7,8 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["util.go"], srcs = ["errors.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/testing", importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors",
) )
filegroup( filegroup(

View File

@ -0,0 +1,25 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import "fmt"
// ErrCorruptCheckpoint error is reported when checksum does not match
var ErrCorruptCheckpoint = fmt.Errorf("checkpoint is corrupted")
// ErrCheckpointNotFound is reported when checkpoint is not found for a given key
var ErrCheckpointNotFound = fmt.Errorf("checkpoint is not found")

View File

@ -0,0 +1,28 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing",
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["types.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1",
visibility = ["//visibility:public"],
deps = ["//pkg/kubelet/checkpointmanager/checksum:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,62 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
type protocol string
// portMapping is the port mapping configurations of a sandbox.
type PortMapping struct {
// protocol of the port mapping.
Protocol *protocol
// Port number within the container.
ContainerPort *int32
// Port number on the host.
HostPort *int32
}
// CheckpointData contains all types of data that can be stored in the checkpoint.
type Data struct {
PortMappings []*PortMapping `json:"port_mappings,omitempty"`
HostNetwork bool `json:"host_network,omitempty"`
}
// CheckpointData is a sample example structure to be used in test cases for checkpointing
type CheckpointData struct {
Version string
Name string
Data *Data
Checksum checksum.Checksum
}
func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(*cp.Data)
return json.Marshal(*cp)
}
func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *CheckpointData) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}

View File

@ -27,10 +27,12 @@ type MemStore struct {
sync.Mutex sync.Mutex
} }
// NewMemStore returns an instance of MemStore
func NewMemStore() *MemStore { func NewMemStore() *MemStore {
return &MemStore{mem: make(map[string][]byte)} return &MemStore{mem: make(map[string][]byte)}
} }
// Write writes the data to the store
func (mstore *MemStore) Write(key string, data []byte) error { func (mstore *MemStore) Write(key string, data []byte) error {
mstore.Lock() mstore.Lock()
defer mstore.Unlock() defer mstore.Unlock()
@ -38,6 +40,7 @@ func (mstore *MemStore) Write(key string, data []byte) error {
return nil return nil
} }
// Read returns data read from store
func (mstore *MemStore) Read(key string) ([]byte, error) { func (mstore *MemStore) Read(key string) ([]byte, error) {
mstore.Lock() mstore.Lock()
defer mstore.Unlock() defer mstore.Unlock()
@ -48,6 +51,7 @@ func (mstore *MemStore) Read(key string) ([]byte, error) {
return data, nil return data, nil
} }
// Delete deletes data from the store
func (mstore *MemStore) Delete(key string) error { func (mstore *MemStore) Delete(key string) error {
mstore.Lock() mstore.Lock()
defer mstore.Unlock() defer mstore.Unlock()
@ -55,6 +59,7 @@ func (mstore *MemStore) Delete(key string) error {
return nil return nil
} }
// List returns all the keys from the store
func (mstore *MemStore) List() ([]string, error) { func (mstore *MemStore) List() ([]string, error) {
mstore.Lock() mstore.Lock()
defer mstore.Unlock() defer mstore.Unlock()

View File

@ -15,13 +15,14 @@ go_library(
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
"//pkg/kubelet/config:go_default_library", "//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library", "//pkg/scheduler/schedulercache:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library", "//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
@ -39,10 +40,9 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library", "//pkg/scheduler/schedulercache:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
@ -62,7 +62,10 @@ filegroup(
filegroup( filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [
":package-srcs",
"//pkg/kubelet/cm/devicemanager/checkpoint:all-srcs",
],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["checkpoint.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,81 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
type DeviceManagerCheckpoint interface {
checkpointmanager.Checkpoint
GetData() ([]PodDevicesEntry, map[string][]string)
}
type PodDevicesEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
PodDeviceEntries []PodDevicesEntry
RegisteredDevices map[string][]string
}
type Data struct {
Data checkpointData
Checksum checksum.Checksum
}
// NewDeviceManagerCheckpoint returns an instance of Checkpoint
func New(devEntries []PodDevicesEntry,
devices map[string][]string) DeviceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodDeviceEntries: devEntries,
RegisteredDevices: devices,
},
}
}
// MarshalCheckpoint returns marshalled data
func (cp *Data) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = checksum.New(cp.Data)
return json.Marshal(*cp)
}
// UnmarshalCheckpoint returns unmarshalled data
func (cp *Data) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that passed checksum is same as calculated checksum
func (cp *Data) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) {
return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices
}

View File

@ -18,7 +18,6 @@ package devicemanager
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net" "net"
"os" "os"
@ -34,12 +33,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
"k8s.io/kubernetes/pkg/scheduler/schedulercache" "k8s.io/kubernetes/pkg/scheduler/schedulercache"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
) )
// ActivePodsFunc is a function that returns a list of pods to reconcile. // ActivePodsFunc is a function that returns a list of pods to reconcile.
@ -84,8 +84,8 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping. // podDevices contains pod to allocated device mapping.
podDevices podDevices podDevices podDevices
store utilstore.Store
pluginOpts map[string]*pluginapi.DevicePluginOptions pluginOpts map[string]*pluginapi.DevicePluginOptions
checkpointManager checkpointmanager.CheckpointManager
} }
type sourcesReadyStub struct{} type sourcesReadyStub struct{}
@ -122,11 +122,11 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
// Before that, initializes them to perform no-op operations. // Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{} manager.sourcesReady = &sourcesReadyStub{}
var err error checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err) return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
} }
manager.checkpointManager = checkpointManager
return manager, nil return manager, nil
} }
@ -454,33 +454,19 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
return capacity, allocatable, deletedResources.UnsortedList() return capacity, allocatable, deletedResources.UnsortedList()
} }
// checkpointData struct is used to store pod to device allocation information
// and registered device information in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
PodDeviceEntries []podDevicesCheckpointEntry
RegisteredDevices map[string][]string
}
// Checkpoints device to container allocation information to disk. // Checkpoints device to container allocation information to disk.
func (m *ManagerImpl) writeCheckpoint() error { func (m *ManagerImpl) writeCheckpoint() error {
m.mutex.Lock() m.mutex.Lock()
data := checkpointData{ registeredDevs := make(map[string][]string)
PodDeviceEntries: m.podDevices.toCheckpointData(),
RegisteredDevices: make(map[string][]string),
}
for resource, devices := range m.healthyDevices { for resource, devices := range m.healthyDevices {
data.RegisteredDevices[resource] = devices.UnsortedList() registeredDevs[resource] = devices.UnsortedList()
} }
data := checkpoint.New(m.podDevices.toCheckpointData(),
registeredDevs)
m.mutex.Unlock() m.mutex.Unlock()
err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
dataJSON, err := json.Marshal(data)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
}
err = m.store.Write(kubeletDeviceManagerCheckpoint, dataJSON)
if err != nil {
return fmt.Errorf("failed to write deviceplugin checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
} }
return nil return nil
} }
@ -488,24 +474,23 @@ func (m *ManagerImpl) writeCheckpoint() error {
// Reads device to container allocation information from disk, and populates // Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly. // m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error { func (m *ManagerImpl) readCheckpoint() error {
content, err := m.store.Read(kubeletDeviceManagerCheckpoint) registeredDevs := make(map[string][]string)
devEntries := make([]checkpoint.PodDevicesEntry, 0)
cp := checkpoint.New(devEntries, registeredDevs)
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
if err != nil { if err != nil {
if err == utilstore.ErrKeyNotFound { if err == errors.ErrCheckpointNotFound {
glog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
return nil return nil
} }
return fmt.Errorf("failed to read checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) return err
} }
glog.V(4).Infof("Read checkpoint file %s\n", kubeletDeviceManagerCheckpoint)
var data checkpointData
if err := json.Unmarshal(content, &data); err != nil {
return fmt.Errorf("failed to unmarshal deviceplugin checkpoint data: %v", err)
}
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.podDevices.fromCheckpointData(data.PodDeviceEntries) podDevices, registeredDevs := cp.GetData()
m.podDevices.fromCheckpointData(podDevices)
m.allocatedDevices = m.podDevices.devices() m.allocatedDevices = m.podDevices.devices()
for resource := range data.RegisteredDevices { for resource := range registeredDevs {
// During start up, creates empty healthyDevices list so that the resource capacity // During start up, creates empty healthyDevices list so that the resource capacity
// will stay zero till the corresponding device plugin re-registers. // will stay zero till the corresponding device plugin re-registers.
m.healthyDevices[resource] = sets.NewString() m.healthyDevices[resource] = sets.NewString()

View File

@ -17,7 +17,6 @@ limitations under the License.
package devicemanager package devicemanager
import ( import (
"flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -34,10 +33,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
"k8s.io/kubernetes/pkg/scheduler/schedulercache" "k8s.io/kubernetes/pkg/scheduler/schedulercache"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
) )
const ( const (
@ -347,20 +345,19 @@ func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.Cont
func TestCheckpoint(t *testing.T) { func TestCheckpoint(t *testing.T) {
resourceName1 := "domain1.com/resource1" resourceName1 := "domain1.com/resource1"
resourceName2 := "domain2.com/resource2" resourceName2 := "domain2.com/resource2"
as := assert.New(t) as := assert.New(t)
tmpDir, err := ioutil.TempDir("", "checkpoint") tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err) as.Nil(err)
defer os.RemoveAll(tmpDir) ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{ testManager := &ManagerImpl{
socketdir: tmpDir,
endpoints: make(map[string]endpoint), endpoints: make(map[string]endpoint),
healthyDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices), podDevices: make(podDevices),
checkpointManager: ckm,
} }
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
testManager.podDevices.insert("pod1", "con1", resourceName1, testManager.podDevices.insert("pod1", "con1", resourceName1,
constructDevices([]string{"dev1", "dev2"}), constructDevices([]string{"dev1", "dev2"}),
@ -479,8 +476,12 @@ func makePod(limits v1.ResourceList) *v1.Pod {
} }
} }
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl { func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) {
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
return nil, err
}
testManager := &ManagerImpl{ testManager := &ManagerImpl{
socketdir: tmpDir, socketdir: tmpDir,
callback: monitorCallback, callback: monitorCallback,
@ -492,8 +493,8 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
podDevices: make(podDevices), podDevices: make(podDevices),
activePods: activePods, activePods: activePods,
sourcesReady: &sourcesReadyStub{}, sourcesReady: &sourcesReadyStub{},
checkpointManager: ckm,
} }
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
for _, res := range testRes { for _, res := range testRes {
testManager.healthyDevices[res.resourceName] = sets.NewString() testManager.healthyDevices[res.resourceName] = sets.NewString()
for _, dev := range res.devs { for _, dev := range res.devs {
@ -525,7 +526,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
} }
} }
} }
return testManager return testManager, nil
} }
func getTestNodeInfo(allocatable v1.ResourceList) *schedulercache.NodeInfo { func getTestNodeInfo(allocatable v1.ResourceList) *schedulercache.NodeInfo {
@ -546,7 +547,6 @@ type TestResource struct {
} }
func TestPodContainerDeviceAllocation(t *testing.T) { func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
res1 := TestResource{ res1 := TestResource{
resourceName: "domain1.com/resource1", resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
@ -569,7 +569,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{}) nodeInfo := getTestNodeInfo(v1.ResourceList{})
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
as.Nil(err)
testPods := []*v1.Pod{ testPods := []*v1.Pod{
makePod(v1.ResourceList{ makePod(v1.ResourceList{
@ -664,7 +665,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
as.Nil(err) as.Nil(err)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
as.Nil(err)
podWithPluginResourcesInInitContainers := &v1.Pod{ podWithPluginResourcesInInitContainers := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -742,14 +744,18 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
as := assert.New(t) as := assert.New(t)
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{ testManager := &ManagerImpl{
callback: monitorCallback, callback: monitorCallback,
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
podDevices: make(podDevices), podDevices: make(podDevices),
checkpointManager: ckm,
} }
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
// require one of resource1 and one of resource2 // require one of resource1 and one of resource2
testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1] = sets.NewString()
testManager.allocatedDevices[resourceName1].Insert(devID1) testManager.allocatedDevices[resourceName1].Insert(devID1)
@ -796,7 +802,8 @@ func TestDevicePreStartContainer(t *testing.T) {
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true} pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true}
testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts)
as.Nil(err)
ch := make(chan []string, 1) ch := make(chan []string, 1)
testManager.endpoints[res1.resourceName] = &MockEndpoint{ testManager.endpoints[res1.resourceName] = &MockEndpoint{

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
) )
@ -126,18 +127,9 @@ func (pdev podDevices) devices() map[string]sets.String {
return ret return ret
} }
// podDevicesCheckpointEntry is used to record <pod, container> to device allocation information.
type podDevicesCheckpointEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte
}
// Turns podDevices to checkpointData. // Turns podDevices to checkpointData.
func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
var data []podDevicesCheckpointEntry var data []checkpoint.PodDevicesEntry
for podUID, containerDevices := range pdev { for podUID, containerDevices := range pdev {
for conName, resources := range containerDevices { for conName, resources := range containerDevices {
for resource, devices := range resources { for resource, devices := range resources {
@ -152,7 +144,12 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
continue continue
} }
data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp}) data = append(data, checkpoint.PodDevicesEntry{
PodUID: podUID,
ContainerName: conName,
ResourceName: resource,
DeviceIDs: devIds,
AllocResp: allocResp})
} }
} }
} }
@ -160,7 +157,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry {
} }
// Populates podDevices from the passed in checkpointData. // Populates podDevices from the passed in checkpointData.
func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) { func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
for _, entry := range data { for _, entry := range data {
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)

View File

@ -58,6 +58,7 @@ go_library(
"//pkg/apis/core/v1:go_default_library", "//pkg/apis/core/v1:go_default_library",
"//pkg/apis/core/validation:go_default_library", "//pkg/apis/core/validation:go_default_library",
"//pkg/kubelet/checkpoint:go_default_library", "//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/checkpoint" "k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -64,7 +65,7 @@ type PodConfig struct {
// contains the list of all configured sources // contains the list of all configured sources
sourcesLock sync.Mutex sourcesLock sync.Mutex
sources sets.String sources sets.String
checkpointManager checkpoint.Manager checkpointManager checkpointmanager.CheckpointManager
} }
// NewPodConfig creates an object that can merge many configuration sources into a stream // NewPodConfig creates an object that can merge many configuration sources into a stream
@ -114,12 +115,14 @@ func (c *PodConfig) Sync() {
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error { func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
var err error var err error
if c.checkpointManager == nil { if c.checkpointManager == nil {
c.checkpointManager = checkpoint.NewCheckpointManager(path) c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
pods, err := c.checkpointManager.LoadPods() if err != nil {
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err == nil { if err == nil {
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
} }
} }
}
return err return err
} }

View File

@ -82,6 +82,8 @@ go_library(
"//pkg/credentialprovider:go_default_library", "//pkg/credentialprovider:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/checksum:go_default_library",
"//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim/cm:go_default_library", "//pkg/kubelet/dockershim/cm:go_default_library",
@ -100,8 +102,6 @@ go_library(
"//pkg/kubelet/util/ioutils:go_default_library", "//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/kubelet/util/store:go_default_library", "//pkg/kubelet/util/store:go_default_library",
"//pkg/security/apparmor:go_default_library", "//pkg/security/apparmor:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/util/parsers:go_default_library", "//pkg/util/parsers:go_default_library",
"//vendor/github.com/armon/circbuf:go_default_library", "//vendor/github.com/armon/circbuf:go_default_library",
"//vendor/github.com/blang/semver:go_default_library", "//vendor/github.com/blang/semver:go_default_library",
@ -149,12 +149,12 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/dockershim/libdocker:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library",
"//pkg/kubelet/dockershim/network:go_default_library", "//pkg/kubelet/dockershim/network:go_default_library",
"//pkg/kubelet/dockershim/network/testing:go_default_library", "//pkg/kubelet/dockershim/network/testing:go_default_library",
"//pkg/kubelet/dockershim/testing:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/cache:go_default_library", "//pkg/kubelet/util/cache:go_default_library",
"//pkg/security/apparmor:go_default_library", "//pkg/security/apparmor:go_default_library",
@ -186,7 +186,6 @@ filegroup(
"//pkg/kubelet/dockershim/metrics:all-srcs", "//pkg/kubelet/dockershim/metrics:all-srcs",
"//pkg/kubelet/dockershim/network:all-srcs", "//pkg/kubelet/dockershim/network:all-srcs",
"//pkg/kubelet/dockershim/remote:all-srcs", "//pkg/kubelet/dockershim/remote:all-srcs",
"//pkg/kubelet/dockershim/testing:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],

View File

@ -164,13 +164,14 @@ func containerToRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSand
}, nil }, nil
} }
func checkpointToRuntimeAPISandbox(id string, checkpoint *PodSandboxCheckpoint) *runtimeapi.PodSandbox { func checkpointToRuntimeAPISandbox(id string, checkpoint DockershimCheckpoint) *runtimeapi.PodSandbox {
state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY
_, name, namespace, _, _ := checkpoint.GetData()
return &runtimeapi.PodSandbox{ return &runtimeapi.PodSandbox{
Id: id, Id: id,
Metadata: &runtimeapi.PodSandboxMetadata{ Metadata: &runtimeapi.PodSandboxMetadata{
Name: checkpoint.Name, Name: name,
Namespace: checkpoint.Namespace, Namespace: namespace,
}, },
State: state, State: state,
} }

View File

@ -18,14 +18,9 @@ package dockershim
import ( import (
"encoding/json" "encoding/json"
"fmt"
"hash/fnv"
"path/filepath"
"github.com/golang/glog" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
hashutil "k8s.io/kubernetes/pkg/util/hash"
) )
const ( const (
@ -36,6 +31,11 @@ const (
schemaVersion = "v1" schemaVersion = "v1"
) )
type DockershimCheckpoint interface {
checkpointmanager.Checkpoint
GetData() (string, string, string, []*PortMapping, bool)
}
type Protocol string type Protocol string
// PortMapping is the port mapping configurations of a sandbox. // PortMapping is the port mapping configurations of a sandbox.
@ -65,89 +65,31 @@ type PodSandboxCheckpoint struct {
// Data to checkpoint for pod sandbox. // Data to checkpoint for pod sandbox.
Data *CheckpointData `json:"data,omitempty"` Data *CheckpointData `json:"data,omitempty"`
// Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero // Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero
CheckSum uint64 `json:"checksum"` Checksum checksum.Checksum `json:"checksum"`
} }
// CheckpointHandler provides the interface to manage PodSandbox checkpoint func NewPodSandboxCheckpoint(namespace, name string, data *CheckpointData) DockershimCheckpoint {
type CheckpointHandler interface {
// CreateCheckpoint persists sandbox checkpoint in CheckpointStore.
CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error
// GetCheckpoint retrieves sandbox checkpoint from CheckpointStore.
GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error)
// RemoveCheckpoint removes sandbox checkpoint form CheckpointStore.
// WARNING: RemoveCheckpoint will not return error if checkpoint does not exist.
RemoveCheckpoint(podSandboxID string) error
// ListCheckpoint returns the list of existing checkpoints.
ListCheckpoints() ([]string, error)
}
// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore
type PersistentCheckpointHandler struct {
store utilstore.Store
}
func NewPersistentCheckpointHandler(dockershimRootDir string) (CheckpointHandler, error) {
fstore, err := utilstore.NewFileStore(filepath.Join(dockershimRootDir, sandboxCheckpointDir), utilfs.DefaultFs{})
if err != nil {
return nil, err
}
return &PersistentCheckpointHandler{store: fstore}, nil
}
func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error {
checkpoint.CheckSum = calculateChecksum(*checkpoint)
blob, err := json.Marshal(checkpoint)
if err != nil {
return err
}
return handler.store.Write(podSandboxID, blob)
}
func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) {
blob, err := handler.store.Read(podSandboxID)
if err != nil {
return nil, err
}
var checkpoint PodSandboxCheckpoint
//TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type.
err = json.Unmarshal(blob, &checkpoint)
if err != nil {
glog.Errorf("Failed to unmarshal checkpoint %q, removing checkpoint. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err)
handler.RemoveCheckpoint(podSandboxID)
return nil, fmt.Errorf("failed to unmarshal checkpoint")
}
if checkpoint.CheckSum != calculateChecksum(checkpoint) {
glog.Errorf("Checksum of checkpoint %q is not valid, removing checkpoint", podSandboxID)
handler.RemoveCheckpoint(podSandboxID)
return nil, fmt.Errorf("checkpoint is corrupted")
}
return &checkpoint, nil
}
func (handler *PersistentCheckpointHandler) RemoveCheckpoint(podSandboxID string) error {
return handler.store.Delete(podSandboxID)
}
func (handler *PersistentCheckpointHandler) ListCheckpoints() ([]string, error) {
keys, err := handler.store.List()
if err != nil {
return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err)
}
return keys, nil
}
func NewPodSandboxCheckpoint(namespace, name string) *PodSandboxCheckpoint {
return &PodSandboxCheckpoint{ return &PodSandboxCheckpoint{
Version: schemaVersion, Version: schemaVersion,
Namespace: namespace, Namespace: namespace,
Name: name, Name: name,
Data: &CheckpointData{}, Data: data,
} }
} }
func calculateChecksum(checkpoint PodSandboxCheckpoint) uint64 { func (cp *PodSandboxCheckpoint) MarshalCheckpoint() ([]byte, error) {
checkpoint.CheckSum = 0 cp.Checksum = checksum.New(*cp.Data)
hash := fnv.New32a() return json.Marshal(*cp)
hashutil.DeepHashObject(hash, checkpoint) }
return uint64(hash.Sum32())
func (cp *PodSandboxCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
func (cp *PodSandboxCheckpoint) VerifyChecksum() error {
return cp.Checksum.Verify(*cp.Data)
}
func (cp *PodSandboxCheckpoint) GetData() (string, string, string, []*PortMapping, bool) {
return cp.Version, cp.Name, cp.Namespace, cp.Data.PortMappings, cp.Data.HostNetwork
} }

View File

@ -17,86 +17,17 @@ limitations under the License.
package dockershim package dockershim
import ( import (
"sort"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
utilstore "k8s.io/kubernetes/pkg/kubelet/dockershim/testing"
) )
func NewTestPersistentCheckpointHandler() CheckpointHandler { func TestPodSandboxCheckpoint(t *testing.T) {
return &PersistentCheckpointHandler{store: utilstore.NewMemStore()} data := &CheckpointData{HostNetwork: true}
} checkpoint := NewPodSandboxCheckpoint("ns1", "sandbox1", data)
version, name, namespace, _, hostNetwork := checkpoint.GetData()
func TestPersistentCheckpointHandler(t *testing.T) { assert.Equal(t, schemaVersion, version)
var err error assert.Equal(t, "ns1", namespace)
handler := NewTestPersistentCheckpointHandler() assert.Equal(t, "sandbox1", name)
port80 := int32(80) assert.Equal(t, true, hostNetwork)
port443 := int32(443)
proto := protocolTCP
checkpoint1 := NewPodSandboxCheckpoint("ns1", "sandbox1")
checkpoint1.Data.PortMappings = []*PortMapping{
{
&proto,
&port80,
&port80,
},
{
&proto,
&port443,
&port443,
},
}
checkpoint1.Data.HostNetwork = true
checkpoints := []struct {
podSandboxID string
checkpoint *PodSandboxCheckpoint
expectHostNetwork bool
}{
{
"id1",
checkpoint1,
true,
},
{
"id2",
NewPodSandboxCheckpoint("ns2", "sandbox2"),
false,
},
}
for _, tc := range checkpoints {
// Test CreateCheckpoints
err = handler.CreateCheckpoint(tc.podSandboxID, tc.checkpoint)
assert.NoError(t, err)
// Test GetCheckpoints
checkpoint, err := handler.GetCheckpoint(tc.podSandboxID)
assert.NoError(t, err)
assert.Equal(t, *checkpoint, *tc.checkpoint)
assert.Equal(t, checkpoint.Data.HostNetwork, tc.expectHostNetwork)
}
// Test ListCheckpoints
keys, err := handler.ListCheckpoints()
assert.NoError(t, err)
sort.Strings(keys)
assert.Equal(t, keys, []string{"id1", "id2"})
// Test RemoveCheckpoints
err = handler.RemoveCheckpoint("id1")
assert.NoError(t, err)
// Test Remove Nonexisted Checkpoints
err = handler.RemoveCheckpoint("id1")
assert.NoError(t, err)
// Test ListCheckpoints
keys, err = handler.ListCheckpoints()
assert.NoError(t, err)
assert.Equal(t, keys, []string{"id2"})
// Test Get NonExisted Checkpoint
_, err = handler.GetCheckpoint("id1")
assert.Error(t, err)
} }

View File

@ -30,6 +30,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
@ -118,7 +119,7 @@ func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPod
}(&err) }(&err)
// Step 3: Create Sandbox Checkpoint. // Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err return nil, err
} }
@ -189,7 +190,8 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP
name = metadata.Name name = metadata.Name
hostNetwork = (networkNamespaceMode(inspectResult) == runtimeapi.NamespaceMode_NODE) hostNetwork = (networkNamespaceMode(inspectResult) == runtimeapi.NamespaceMode_NODE)
} else { } else {
checkpoint, checkpointErr := ds.checkpointHandler.GetCheckpoint(podSandboxID) checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{})
checkpointErr := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint)
// Proceed if both sandbox container and checkpoint could not be found. This means that following // Proceed if both sandbox container and checkpoint could not be found. This means that following
// actions will only have sandbox ID and not have pod namespace and name information. // actions will only have sandbox ID and not have pod namespace and name information.
@ -204,9 +206,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP
fmt.Errorf("failed to get sandbox status: %v", statusErr)}) fmt.Errorf("failed to get sandbox status: %v", statusErr)})
} }
} else { } else {
namespace = checkpoint.Namespace _, name, namespace, _, hostNetwork = checkpoint.GetData()
name = checkpoint.Name
hostNetwork = checkpoint.Data != nil && checkpoint.Data.HostNetwork
} }
} }
@ -237,7 +237,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP
errList = append(errList, err) errList = append(errList, err)
} else { } else {
// remove the checkpoint for any sandbox that is not found in the runtime // remove the checkpoint for any sandbox that is not found in the runtime
ds.checkpointHandler.RemoveCheckpoint(podSandboxID) ds.checkpointManager.RemoveCheckpoint(podSandboxID)
} }
} }
@ -284,7 +284,7 @@ func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.Rem
} }
// Remove the checkpoint of the sandbox. // Remove the checkpoint of the sandbox.
if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { if err := ds.checkpointManager.RemoveCheckpoint(podSandboxID); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
if len(errs) == 0 { if len(errs) == 0 {
@ -465,7 +465,7 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod
var err error var err error
checkpoints := []string{} checkpoints := []string{}
if filter == nil { if filter == nil {
checkpoints, err = ds.checkpointHandler.ListCheckpoints() checkpoints, err = ds.checkpointManager.ListCheckpoints()
if err != nil { if err != nil {
glog.Errorf("Failed to list checkpoints: %v", err) glog.Errorf("Failed to list checkpoints: %v", err)
} }
@ -501,7 +501,8 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod
if _, ok := sandboxIDs[id]; ok { if _, ok := sandboxIDs[id]; ok {
continue continue
} }
checkpoint, err := ds.checkpointHandler.GetCheckpoint(id) checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{})
err := ds.checkpointManager.GetCheckpoint(id, checkpoint)
if err != nil { if err != nil {
glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err) glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err)
continue continue
@ -624,20 +625,20 @@ func ipcNamespaceMode(container *dockertypes.ContainerJSON) runtimeapi.Namespace
return runtimeapi.NamespaceMode_POD return runtimeapi.NamespaceMode_POD
} }
func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) *PodSandboxCheckpoint { func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) checkpointmanager.Checkpoint {
checkpoint := NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name) data := CheckpointData{}
for _, pm := range config.GetPortMappings() { for _, pm := range config.GetPortMappings() {
proto := toCheckpointProtocol(pm.Protocol) proto := toCheckpointProtocol(pm.Protocol)
checkpoint.Data.PortMappings = append(checkpoint.Data.PortMappings, &PortMapping{ data.PortMappings = append(data.PortMappings, &PortMapping{
HostPort: &pm.HostPort, HostPort: &pm.HostPort,
ContainerPort: &pm.ContainerPort, ContainerPort: &pm.ContainerPort,
Protocol: &proto, Protocol: &proto,
}) })
} }
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
checkpoint.Data.HostNetwork = true data.HostNetwork = true
} }
return checkpoint return NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name, &data)
} }
func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol { func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol {

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"path/filepath"
"sync" "sync"
"time" "time"
@ -30,6 +31,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecm "k8s.io/kubernetes/pkg/kubelet/cm" kubecm "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/cm" "k8s.io/kubernetes/pkg/kubelet/dockershim/cm"
@ -191,7 +193,8 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
client := NewDockerClientFromConfig(config) client := NewDockerClientFromConfig(config)
c := libdocker.NewInstrumentedInterface(client) c := libdocker.NewInstrumentedInterface(client)
checkpointHandler, err := NewPersistentCheckpointHandler(dockershimRootDir)
checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -205,7 +208,7 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
execHandler: &NativeExecHandler{}, execHandler: &NativeExecHandler{},
}, },
containerManager: cm.NewContainerManager(cgroupsName, client), containerManager: cm.NewContainerManager(cgroupsName, client),
checkpointHandler: checkpointHandler, checkpointManager: checkpointManager,
disableSharedPID: disableSharedPID, disableSharedPID: disableSharedPID,
networkReady: make(map[string]bool), networkReady: make(map[string]bool),
} }
@ -293,7 +296,7 @@ type dockerService struct {
containerManager cm.ContainerManager containerManager cm.ContainerManager
// cgroup driver used by Docker runtime. // cgroup driver used by Docker runtime.
cgroupDriver string cgroupDriver string
checkpointHandler CheckpointHandler checkpointManager checkpointmanager.CheckpointManager
// caches the version of the runtime. // caches the version of the runtime.
// To be compatible with multiple docker versions, we need to perform // To be compatible with multiple docker versions, we need to perform
// version checking for some operations. Use this cache to avoid querying // version checking for some operations. Use this cache to avoid querying
@ -365,7 +368,8 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) {
// GetPodPortMappings returns the port mappings of the given podSandbox ID. // GetPodPortMappings returns the port mappings of the given podSandbox ID.
func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) { func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) {
// TODO: get portmappings from docker labels for backward compatibility // TODO: get portmappings from docker labels for backward compatibility
checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{})
err := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint)
// Return empty portMappings if checkpoint is not found // Return empty portMappings if checkpoint is not found
if err != nil { if err != nil {
if err == utilstore.ErrKeyNotFound { if err == utilstore.ErrKeyNotFound {
@ -373,9 +377,9 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po
} }
return nil, err return nil, err
} }
_, _, _, checkpointedPortMappings, _ := checkpoint.GetData()
portMappings := make([]*hostport.PortMapping, 0, len(checkpoint.Data.PortMappings)) portMappings := make([]*hostport.PortMapping, 0, len(checkpointedPortMappings))
for _, pm := range checkpoint.Data.PortMappings { for _, pm := range checkpointedPortMappings {
proto := toAPIProtocol(*pm.Protocol) proto := toAPIProtocol(*pm.Protocol)
portMappings = append(portMappings, &hostport.PortMapping{ portMappings = append(portMappings, &hostport.PortMapping{
HostPort: *pm.HostPort, HostPort: *pm.HostPort,

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
"k8s.io/kubernetes/pkg/kubelet/dockershim/network" "k8s.io/kubernetes/pkg/kubelet/dockershim/network"
@ -43,15 +44,50 @@ func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin {
return nettest.NewMockNetworkPlugin(ctrl) return nettest.NewMockNetworkPlugin(ctrl)
} }
type mockCheckpointManager struct {
checkpoint map[string]*PodSandboxCheckpoint
}
func (ckm *mockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
ckm.checkpoint[checkpointKey] = checkpoint.(*PodSandboxCheckpoint)
return nil
}
func (ckm *mockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
*(checkpoint.(*PodSandboxCheckpoint)) = *(ckm.checkpoint[checkpointKey])
return nil
}
func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
_, ok := ckm.checkpoint[checkpointKey]
if ok {
delete(ckm.checkpoint, "moo")
}
return nil
}
func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) {
var keys []string
for key := range ckm.checkpoint {
keys = append(keys, key)
}
return keys, nil
}
func newMockCheckpointManager() checkpointmanager.CheckpointManager {
return &mockCheckpointManager{checkpoint: make(map[string]*PodSandboxCheckpoint)}
}
func newTestDockerService() (*dockerService, *libdocker.FakeDockerClient, *clock.FakeClock) { func newTestDockerService() (*dockerService, *libdocker.FakeDockerClient, *clock.FakeClock) {
fakeClock := clock.NewFakeClock(time.Time{}) fakeClock := clock.NewFakeClock(time.Time{})
c := libdocker.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23").WithRandSource(rand.NewSource(0)) c := libdocker.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23").WithRandSource(rand.NewSource(0))
pm := network.NewPluginManager(&network.NoopNetworkPlugin{}) pm := network.NewPluginManager(&network.NoopNetworkPlugin{})
ckm := newMockCheckpointManager()
return &dockerService{ return &dockerService{
client: c, client: c,
os: &containertest.FakeOS{}, os: &containertest.FakeOS{},
network: pm, network: pm,
checkpointHandler: NewTestPersistentCheckpointHandler(), checkpointManager: ckm,
networkReady: make(map[string]bool), networkReady: make(map[string]bool),
}, c, fakeClock }, c, fakeClock
} }

View File

@ -61,6 +61,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap" "k8s.io/kubernetes/pkg/kubelet/configmap"
@ -553,8 +554,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.livenessManager = proberesults.NewManager() klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache() klet.podCache = kubecontainer.NewCache()
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager) klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
if remoteRuntimeEndpoint != "" { if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified

View File

@ -215,7 +215,7 @@ func newTestKubeletWithImageList(
kubelet.secretManager = secretManager kubelet.secretManager = secretManager
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager) kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager, podtest.NewMockCheckpointManager())
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
kubelet.containerRuntime = fakeRuntime kubelet.containerRuntime = fakeRuntime

View File

@ -52,7 +52,7 @@ func TestGetVolumeExec(t *testing.T) {
fakeSecretManager := secret.NewFakeManager() fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager( podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
podManager.SetPods(pods) podManager.SetPods(pods)
// Prepare fake /var/lib/kubelet // Prepare fake /var/lib/kubelet

View File

@ -15,6 +15,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/pod", importpath = "k8s.io/kubernetes/pkg/kubelet/pod",
deps = [ deps = [
"//pkg/kubelet/checkpoint:go_default_library", "//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/configmap:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/secret:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/checkpoint" "k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/configmap" "k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/secret"
@ -121,18 +122,18 @@ type basicManager struct {
// basicManager is keeping secretManager and configMapManager up-to-date. // basicManager is keeping secretManager and configMapManager up-to-date.
secretManager secret.Manager secretManager secret.Manager
configMapManager configmap.Manager configMapManager configmap.Manager
checkpointManager checkpoint.Manager checkpointManager checkpointmanager.CheckpointManager
// A mirror pod client to create/delete mirror pods. // A mirror pod client to create/delete mirror pods.
MirrorClient MirrorClient
} }
// NewBasicPodManager returns a functional Manager. // NewBasicPodManager returns a functional Manager.
func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager { func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager {
pm := &basicManager{} pm := &basicManager{}
pm.secretManager = secretManager pm.secretManager = secretManager
pm.configMapManager = configMapManager pm.configMapManager = configMapManager
pm.checkpointManager = checkpoint.GetInstance() pm.checkpointManager = cpm
pm.MirrorClient = client pm.MirrorClient = client
pm.SetPods(nil) pm.SetPods(nil)
return pm return pm
@ -161,7 +162,7 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
defer pm.lock.Unlock() defer pm.lock.Unlock()
pm.updatePodsInternal(pod) pm.updatePodsInternal(pod)
if pm.checkpointManager != nil { if pm.checkpointManager != nil {
if err := pm.checkpointManager.WritePod(pod); err != nil { if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil {
glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName()) glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName())
} }
} }
@ -224,7 +225,7 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) {
delete(pm.podByFullName, podFullName) delete(pm.podByFullName, podFullName)
} }
if pm.checkpointManager != nil { if pm.checkpointManager != nil {
if err := pm.checkpointManager.DeletePod(pod); err != nil { if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil {
glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName()) glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName())
} }
} }

View File

@ -34,7 +34,7 @@ func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
fakeMirrorClient := podtest.NewFakeMirrorClient() fakeMirrorClient := podtest.NewFakeMirrorClient()
secretManager := secret.NewFakeManager() secretManager := secret.NewFakeManager()
configMapManager := configmap.NewFakeManager() configMapManager := configmap.NewFakeManager()
manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager) manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager, podtest.NewMockCheckpointManager()).(*basicManager)
return manager, fakeMirrorClient return manager, fakeMirrorClient
} }

View File

@ -13,6 +13,8 @@ go_library(
], ],
importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing", importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing",
deps = [ deps = [
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library", "//vendor/github.com/stretchr/testify/mock:go_default_library",

View File

@ -21,6 +21,8 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
cp "k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
) )
@ -81,3 +83,37 @@ func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) {
defer fmc.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName] return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
} }
type MockCheckpointManager struct {
checkpoint map[string]*cp.Data
}
func (ckm *MockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
ckm.checkpoint[checkpointKey] = (checkpoint.(*cp.Data))
return nil
}
func (ckm *MockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error {
*(checkpoint.(*cp.Data)) = *(ckm.checkpoint[checkpointKey])
return nil
}
func (ckm *MockCheckpointManager) RemoveCheckpoint(checkpointKey string) error {
_, ok := ckm.checkpoint[checkpointKey]
if ok {
delete(ckm.checkpoint, "moo")
}
return nil
}
func (ckm *MockCheckpointManager) ListCheckpoints() ([]string, error) {
var keys []string
for key := range ckm.checkpoint {
keys = append(keys, key)
}
return keys, nil
}
func NewMockCheckpointManager() checkpointmanager.CheckpointManager {
return &MockCheckpointManager{checkpoint: make(map[string]*cp.Data)}
}

View File

@ -99,7 +99,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
func newTestManager() *manager { func newTestManager() *manager {
refManager := kubecontainer.NewRefManager() refManager := kubecontainer.NewRefManager()
refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings. refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings.
podManager := kubepod.NewBasicPodManager(nil, nil, nil) podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil)
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod()) podManager.AddPod(getTestPod())
m := NewManager( m := NewManager(

View File

@ -118,7 +118,7 @@ func TestDoProbe(t *testing.T) {
} }
// Clean up. // Clean up.
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}) m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{})
resultsManager(m, probeType).Remove(testContainerID) resultsManager(m, probeType).Remove(testContainerID)
} }
} }

View File

@ -63,7 +63,7 @@ func TestRunOnce(t *testing.T) {
fakeSecretManager := secret.NewFakeManager() fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager( podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
fakeRuntime := &containertest.FakeRuntime{} fakeRuntime := &containertest.FakeRuntime{}
basePath, err := utiltesting.MkTmpdir("kubelet") basePath, err := utiltesting.MkTmpdir("kubelet")
if err != nil { if err != nil {

View File

@ -75,7 +75,7 @@ func (m *manager) testSyncBatch() {
} }
func newTestManager(kubeClient clientset.Interface) *manager { func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager(), podtest.NewMockCheckpointManager())
podManager.AddPod(getTestPod()) podManager.AddPod(getTestPod())
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
} }

View File

@ -523,7 +523,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist
fakeSecretManager := secret.NewFakeManager() fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager()
fakePodManager := kubepod.NewBasicPodManager( fakePodManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager())
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr) fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr)
fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr) fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr)

View File

@ -55,7 +55,8 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err) t.Fatalf("can't make a temp dir: %v", err)
} }
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
node, pod, pv, claim := createObjects() node, pod, pv, claim := createObjects()
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
@ -97,7 +98,8 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err) t.Fatalf("can't make a temp dir: %v", err)
} }
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
node, pod, pv, claim := createObjects() node, pod, pv, claim := createObjects()
claim.Status = v1.PersistentVolumeClaimStatus{ claim.Status = v1.PersistentVolumeClaimStatus{
@ -135,7 +137,8 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err) t.Fatalf("can't make a temp dir: %v", err)
} }
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) cpm := podtest.NewMockCheckpointManager()
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm)
node, pod, _, claim := createObjects() node, pod, _, claim := createObjects()