From d62bd9ef6571c13b134162da2f7ae41785c6a59e Mon Sep 17 00:00:00 2001 From: vikaschoudhary16 Date: Mon, 20 Nov 2017 03:43:52 -0500 Subject: [PATCH 1/2] Node-level Checkpointing manager --- hack/.golint_failures | 3 + pkg/kubelet/BUILD | 1 + pkg/kubelet/checkpointmanager/BUILD | 48 ++++ pkg/kubelet/checkpointmanager/README.md | 24 ++ .../checkpointmanager/checkpoint_manager.go | 106 ++++++++ .../checkpoint_manager_test.go | 246 ++++++++++++++++++ pkg/kubelet/checkpointmanager/checksum/BUILD | 26 ++ .../checkpointmanager/checksum/checksum.go | 46 ++++ .../errors}/BUILD | 4 +- .../checkpointmanager/errors/errors.go | 22 ++ pkg/kubelet/checkpointmanager/testing/BUILD | 28 ++ .../example_checkpoint_formats/v1/BUILD | 23 ++ .../example_checkpoint_formats/v1/types.go | 62 +++++ .../testing/util.go | 0 pkg/kubelet/cm/devicemanager/BUILD | 13 +- pkg/kubelet/cm/devicemanager/checkpoint/BUILD | 26 ++ .../cm/devicemanager/checkpoint/checkpoint.go | 81 ++++++ pkg/kubelet/cm/devicemanager/manager.go | 65 ++--- pkg/kubelet/cm/devicemanager/manager_test.go | 75 +++--- pkg/kubelet/cm/devicemanager/pod_devices.go | 18 +- pkg/kubelet/dockershim/BUILD | 6 +- pkg/kubelet/dockershim/convert.go | 7 +- pkg/kubelet/dockershim/docker_checkpoint.go | 108 ++------ .../dockershim/docker_checkpoint_test.go | 85 +----- pkg/kubelet/dockershim/docker_sandbox.go | 29 ++- pkg/kubelet/dockershim/docker_service.go | 18 +- pkg/kubelet/dockershim/docker_service_test.go | 38 ++- 27 files changed, 927 insertions(+), 281 deletions(-) create mode 100644 pkg/kubelet/checkpointmanager/BUILD create mode 100644 pkg/kubelet/checkpointmanager/README.md create mode 100644 pkg/kubelet/checkpointmanager/checkpoint_manager.go create mode 100644 pkg/kubelet/checkpointmanager/checkpoint_manager_test.go create mode 100644 pkg/kubelet/checkpointmanager/checksum/BUILD create mode 100644 pkg/kubelet/checkpointmanager/checksum/checksum.go rename pkg/kubelet/{dockershim/testing => checkpointmanager/errors}/BUILD (79%) create mode 100644 pkg/kubelet/checkpointmanager/errors/errors.go create mode 100644 pkg/kubelet/checkpointmanager/testing/BUILD create mode 100644 pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD create mode 100644 pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go rename pkg/kubelet/{dockershim => checkpointmanager}/testing/util.go (100%) create mode 100644 pkg/kubelet/cm/devicemanager/checkpoint/BUILD create mode 100644 pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go diff --git a/hack/.golint_failures b/hack/.golint_failures index 1cec2f3c515..824de2477d9 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -163,6 +163,9 @@ pkg/kubelet/apis/kubeletconfig pkg/kubelet/apis/kubeletconfig/v1beta1 pkg/kubelet/cadvisor pkg/kubelet/cadvisor/testing +pkg/kubelet/checkpoint +pkg/kubelet/checkpointmanager/checksum +pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1 pkg/kubelet/client pkg/kubelet/cm pkg/kubelet/cm/util diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 891a61bdc05..b7657b78239 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -246,6 +246,7 @@ filegroup( "//pkg/kubelet/cadvisor:all-srcs", "//pkg/kubelet/certificate:all-srcs", "//pkg/kubelet/checkpoint:all-srcs", + "//pkg/kubelet/checkpointmanager:all-srcs", "//pkg/kubelet/client:all-srcs", "//pkg/kubelet/cm:all-srcs", "//pkg/kubelet/config:all-srcs", diff --git a/pkg/kubelet/checkpointmanager/BUILD b/pkg/kubelet/checkpointmanager/BUILD new file mode 100644 index 00000000000..c27bb8f5898 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/BUILD @@ -0,0 +1,48 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["checkpoint_manager.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager", + deps = [ + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/util/store:go_default_library", + "//pkg/util/filesystem:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["checkpoint_manager_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/checkpointmanager/testing:go_default_library", + "//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/checkpointmanager/checksum:all-srcs", + "//pkg/kubelet/checkpointmanager/errors:all-srcs", + "//pkg/kubelet/checkpointmanager/testing:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/checkpointmanager/README.md b/pkg/kubelet/checkpointmanager/README.md new file mode 100644 index 00000000000..d5b6fd6eb74 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/README.md @@ -0,0 +1,24 @@ +## DISCLAIMER +Sig-Node community has reached a general consensus, as a best practice, to +avoid introducing any new checkpointing support. We reached this understanding +after struggling with some hard-to-debug issues in the production environments +caused by the checkpointing. + +## Introduction +This folder contains a framework & primitives, Checkpointing Manager, which is +used by several other Kubelet submodules, `dockershim`, `devicemanager`, `pods` +and `cpumanager`, to implement checkpointing at each submodule level. As already +explained in above `Disclaimer` section, think twice before introducing any further +checkpointing in Kubelet. If still checkpointing is required, then this folder +provides the common APIs and the framework for implementing checkpointing. +Using same APIs across all the submodules will help maintaining consistency at +Kubelet level. + +Below is the history of checkpointing support in Kubelet. + +| Package | First checkpointing support merged on | PR link | +| ------- | --------------------------------------| ------- | +|kubelet/dockershim | Feb 3, 2017 | [[CRI] Implement Dockershim Checkpoint](https://github.com/kubernetes/kubernetes/pull/39903) +|devicemanager| Sep 6, 2017 | [Deviceplugin checkpoint](https://github.com/kubernetes/kubernetes/pull/51744) +| kubelet/pod | Nov 22, 2017 | [Initial basic bootstrap-checkpoint support](https://github.com/kubernetes/kubernetes/pull/50984) +|cpumanager| Oct 27, 2017 |[Add file backed state to cpu manager ](https://github.com/kubernetes/kubernetes/pull/54408) diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager.go b/pkg/kubelet/checkpointmanager/checkpoint_manager.go new file mode 100644 index 00000000000..1d078357bff --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager.go @@ -0,0 +1,106 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpointmanager + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" + utilfs "k8s.io/kubernetes/pkg/util/filesystem" +) + +// Checkpoint provides the process checkpoint data +type Checkpoint interface { + MarshalCheckpoint() ([]byte, error) + UnmarshalCheckpoint(blob []byte) error + VerifyChecksum() error +} + +// CheckpointManager provides the interface to manage checkpoint +type CheckpointManager interface { + // CreateCheckpoint persists checkpoint in CheckpointStore. checkpointKey is the key for utilstore to locate checkpoint. + // For file backed utilstore, checkpointKey is the file name to write the checkpoint data. + CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error + // GetCheckpoint retrieves checkpoint from CheckpointStore. + GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error + // WARNING: RemoveCheckpoint will not return error if checkpoint does not exist. + RemoveCheckpoint(checkpointKey string) error + // ListCheckpoint returns the list of existing checkpoints. + ListCheckpoints() ([]string, error) +} + +// impl is an implementation of CheckpointManager. It persists checkpoint in CheckpointStore +type impl struct { + path string + store utilstore.Store +} + +func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) { + fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{}) + if err != nil { + return nil, err + } + + return &impl{path: checkpointDir, store: fstore}, nil +} + +// CreateCheckpoint persists checkpoint in CheckpointStore. +func (manager *impl) CreateCheckpoint(checkpointKey string, checkpoint Checkpoint) error { + manager.mutex.Lock() + defer manager.mutex.Unlock() + blob, err := checkpoint.MarshalCheckpoint() + if err != nil { + return err + } + return manager.store.Write(checkpointKey, blob) +} + +// GetCheckpoint retrieves checkpoint from CheckpointStore. +func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) error { + manager.mutex.Lock() + defer manager.mutex.Unlock() + blob, err := manager.store.Read(checkpointKey) + if err != nil { + if err == utilstore.ErrKeyNotFound { + return errors.ErrCheckpointNotFound + } + return err + } + err = checkpoint.UnmarshalCheckpoint(blob) + if err == nil { + err = checkpoint.VerifyChecksum() + } + return err +} + +func (manager *impl) RemoveCheckpoint(checkpointKey string) error { + manager.mutex.Lock() + defer manager.mutex.Unlock() + return manager.store.Delete(checkpointKey) +} + +// ListCheckpoints returns the list of existing checkpoints. +func (manager *impl) ListCheckpoints() ([]string, error) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + keys, err := manager.store.List() + if err != nil { + return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err) + } + return keys, nil +} diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go new file mode 100644 index 00000000000..4d02c4d5ac0 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go @@ -0,0 +1,246 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpointmanager + +import ( + "encoding/json" + "hash/fnv" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1" +) + +var testStore *utilstore.MemStore + +type FakeCheckpoint interface { + Checkpoint + GetData() ([]*PortMapping, bool) +} + +// Data contains all types of data that can be stored in the checkpoint. +type Data struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` + HostNetwork bool `json:"host_network,omitempty"` +} + +type CheckpointDataV2 struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` + HostNetwork bool `json:"host_network,omitempty"` + V2Field string `json:"v2field"` +} + +type protocol string + +// portMapping is the port mapping configurations of a sandbox. +type portMapping struct { + // protocol of the port mapping. + Protocol *protocol + // Port number within the container. + ContainerPort *int32 + // Port number on the host. + HostPort *int32 +} + +// CheckpointData is a sample example structure to be used in test cases for checkpointing +type CheckpointData struct { + Version string + Name string + Data *Data + Checksum checksum.Checksum +} + +func newFakeCheckpointV1(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint { + return &CheckpointData{ + Version: "v1", + Name: name, + Data: &Data{ + PortMappings: portMappings, + HostNetwork: hostNetwork, + }, + } +} + +func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *CheckpointData) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} + +func (cp *CheckpointData) GetData() ([]*PortMapping, bool) { + return cp.Data.PortMappings, cp.Data.HostNetwork +} + +type checkpointDataV2 struct { + Version string + Name string + Data *CheckpointDataV2 + Checksum checksum.Checksum +} + +func newFakeCheckpointV2(name string, portMappings []*PortMapping, hostNetwork bool) FakeCheckpoint { + return &checkpointDataV2{ + Version: "v2", + Name: name, + Data: &CheckpointDataV2{ + PortMappings: portMappings, + HostNetwork: hostNetwork, + }, + } +} + +func newFakeCheckpointRemoteV1(name string, portMappings []*v1.PortMapping, hostNetwork bool) Checkpoint { + return &v1.CheckpointData{ + Version: "v1", + Name: name, + Data: &v1.Data{ + PortMappings: portMappings, + HostNetwork: hostNetwork, + }, + } +} + +func (cp *checkpointDataV2) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *checkpointDataV2) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *checkpointDataV2) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} + +func (cp *checkpointDataV2) GetData() ([]*PortMapping, bool) { + return cp.Data.PortMappings, cp.Data.HostNetwork +} + +func newTestCheckpointManager() CheckpointManager { + return &impl{store: testStore} +} + +func TestCheckpointManager(t *testing.T) { + var err error + testStore = utilstore.NewMemStore() + manager := newTestCheckpointManager() + port80 := int32(80) + port443 := int32(443) + proto := protocol("tcp") + + portMappings := []*portMapping{ + { + &proto, + &port80, + &port80, + }, + { + &proto, + &port443, + &port443, + }, + } + checkpoint1 := newFakeCheckpointV1("check1", portMappings, true) + + checkpoints := []struct { + checkpointKey string + checkpoint FakeCheckpoint + expectHostNetwork bool + }{ + { + "key1", + checkpoint1, + true, + }, + { + "key2", + newFakeCheckpointV1("check2", nil, false), + false, + }, + } + + for _, tc := range checkpoints { + // Test CreateCheckpoints + err = manager.CreateCheckpoint(tc.checkpointKey, tc.checkpoint) + assert.NoError(t, err) + + // Test GetCheckpoints + checkpointOut := newFakeCheckpointV1("", nil, false) + err := manager.GetCheckpoint(tc.checkpointKey, checkpointOut) + assert.NoError(t, err) + actualPortMappings, actualHostNetwork := checkpointOut.GetData() + expPortMappings, expHostNetwork := tc.checkpoint.GetData() + assert.Equal(t, actualPortMappings, expPortMappings) + assert.Equal(t, actualHostNetwork, expHostNetwork) + } + // Test it fails if tried to read V1 structure into V2, a different structure from the structure which is checkpointed + checkpointV2 := newFakeCheckpointV2("", nil, false) + err = manager.GetCheckpoint("key1", checkpointV2) + assert.EqualError(t, err, "checkpoint is corrupted") + + // Test it fails if tried to read V1 structure into the same structure but defined in another package + checkpointRemoteV1 := newFakeCheckpointRemoteV1("", nil, false) + err = manager.GetCheckpoint("key1", checkpointRemoteV1) + assert.EqualError(t, err, "checkpoint is corrupted") + + // Test it works if tried to read V1 structure using into a new V1 structure + checkpointV1 := newFakeCheckpointV1("", nil, false) + err = manager.GetCheckpoint("key1", checkpointV1) + assert.NoError(t, err) + + // Test corrupt checksum case + checkpointOut := newFakeCheckpointV1("", nil, false) + blob, err := checkpointOut.MarshalCheckpoint() + assert.NoError(t, err) + testStore.Write("key1", blob) + err = manager.GetCheckpoint("key1", checkpoint1) + assert.EqualError(t, err, "checkpoint is corrupted") + + // Test ListCheckpoints + keys, err := manager.ListCheckpoints() + assert.NoError(t, err) + sort.Strings(keys) + assert.Equal(t, keys, []string{"key1", "key2"}) + + // Test RemoveCheckpoints + err = manager.RemoveCheckpoint("key1") + assert.NoError(t, err) + // Test Remove Nonexisted Checkpoints + err = manager.RemoveCheckpoint("key1") + assert.NoError(t, err) + + // Test ListCheckpoints + keys, err = manager.ListCheckpoints() + assert.NoError(t, err) + assert.Equal(t, keys, []string{"key2"}) + + // Test Get NonExisted Checkpoint + checkpointNE := newFakeCheckpointV1("NE", nil, false) + err = manager.GetCheckpoint("key1", checkpointNE) + assert.Error(t, err) +} diff --git a/pkg/kubelet/checkpointmanager/checksum/BUILD b/pkg/kubelet/checkpointmanager/checksum/BUILD new file mode 100644 index 00000000000..cb86b212baf --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checksum/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["checksum.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/util/hash:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/checkpointmanager/checksum/checksum.go b/pkg/kubelet/checkpointmanager/checksum/checksum.go new file mode 100644 index 00000000000..b2a98e45be4 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/checksum/checksum.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checksum + +import ( + "hash/fnv" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +// Data to be stored as checkpoint +type Checksum uint64 + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cs Checksum) Verify(data interface{}) error { + if cs != New(data) { + return errors.ErrCorruptCheckpoint + } + return nil +} + +func New(data interface{}) Checksum { + return Checksum(getChecksum(data)) +} + +// Get returns calculated checksum of checkpoint data +func getChecksum(data interface{}) uint64 { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, data) + return uint64(hash.Sum32()) +} diff --git a/pkg/kubelet/dockershim/testing/BUILD b/pkg/kubelet/checkpointmanager/errors/BUILD similarity index 79% rename from pkg/kubelet/dockershim/testing/BUILD rename to pkg/kubelet/checkpointmanager/errors/BUILD index 0e57c9b081b..76c04fe9014 100644 --- a/pkg/kubelet/dockershim/testing/BUILD +++ b/pkg/kubelet/checkpointmanager/errors/BUILD @@ -7,8 +7,8 @@ load( go_library( name = "go_default_library", - srcs = ["util.go"], - importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/testing", + srcs = ["errors.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors", ) filegroup( diff --git a/pkg/kubelet/checkpointmanager/errors/errors.go b/pkg/kubelet/checkpointmanager/errors/errors.go new file mode 100644 index 00000000000..25462ffd9c8 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/errors/errors.go @@ -0,0 +1,22 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import "fmt" + +var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.") +var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.") diff --git a/pkg/kubelet/checkpointmanager/testing/BUILD b/pkg/kubelet/checkpointmanager/testing/BUILD new file mode 100644 index 00000000000..51c6764dac9 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/testing/BUILD @@ -0,0 +1,28 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["util.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing", +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD new file mode 100644 index 00000000000..e8a30850f14 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/BUILD @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["types.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1", + visibility = ["//visibility:public"], + deps = ["//pkg/kubelet/checkpointmanager/checksum:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go new file mode 100644 index 00000000000..7a883d5b5c8 --- /dev/null +++ b/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1/types.go @@ -0,0 +1,62 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +type protocol string + +// portMapping is the port mapping configurations of a sandbox. +type PortMapping struct { + // protocol of the port mapping. + Protocol *protocol + // Port number within the container. + ContainerPort *int32 + // Port number on the host. + HostPort *int32 +} + +// CheckpointData contains all types of data that can be stored in the checkpoint. +type Data struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` + HostNetwork bool `json:"host_network,omitempty"` +} + +// CheckpointData is a sample example structure to be used in test cases for checkpointing +type CheckpointData struct { + Version string + Name string + Data *Data + Checksum checksum.Checksum +} + +func (cp *CheckpointData) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *CheckpointData) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *CheckpointData) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} diff --git a/pkg/kubelet/dockershim/testing/util.go b/pkg/kubelet/checkpointmanager/testing/util.go similarity index 100% rename from pkg/kubelet/dockershim/testing/util.go rename to pkg/kubelet/checkpointmanager/testing/util.go diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index f23bfc488e2..ac2643dbb07 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -15,13 +15,14 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/metrics:go_default_library", - "//pkg/kubelet/util/store:go_default_library", "//pkg/scheduler/schedulercache:go_default_library", - "//pkg/util/filesystem:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -39,10 +40,9 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", - "//pkg/kubelet/util/store:go_default_library", "//pkg/scheduler/schedulercache:go_default_library", - "//pkg/util/filesystem:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -62,7 +62,10 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/kubelet/cm/devicemanager/checkpoint:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/BUILD b/pkg/kubelet/cm/devicemanager/checkpoint/BUILD new file mode 100644 index 00000000000..91506a71f7c --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/checkpoint/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["checkpoint.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go new file mode 100644 index 00000000000..9f8cac139cd --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go @@ -0,0 +1,81 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkpoint + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +type DeviceManagerCheckpoint interface { + checkpointmanager.Checkpoint + GetData() ([]PodDevicesEntry, map[string][]string) +} + +type PodDevicesEntry struct { + PodUID string + ContainerName string + ResourceName string + DeviceIDs []string + AllocResp []byte +} + +// checkpointData struct is used to store pod to device allocation information +// in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + PodDeviceEntries []PodDevicesEntry + RegisteredDevices map[string][]string +} + +type Data struct { + Data checkpointData + Checksum checksum.Checksum +} + +// NewDeviceManagerCheckpoint returns an instance of Checkpoint +func New(devEntries []PodDevicesEntry, + devices map[string][]string) DeviceManagerCheckpoint { + return &Data{ + Data: checkpointData{ + PodDeviceEntries: devEntries, + RegisteredDevices: devices, + }, + } +} + +// MarshalCheckpoint returns marshalled data +func (cp *Data) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(cp.Data) + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint returns unmarshalled data +func (cp *Data) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cp *Data) VerifyChecksum() error { + return cp.Checksum.Verify(cp.Data) +} + +func (cp *Data) GetData() ([]PodDevicesEntry, map[string][]string) { + return cp.Data.PodDeviceEntries, cp.Data.RegisteredDevices +} diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 6c82fbe53d9..f415181dffa 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -18,7 +18,6 @@ package devicemanager import ( "context" - "encoding/json" "fmt" "net" "os" @@ -34,12 +33,13 @@ import ( "k8s.io/apimachinery/pkg/util/sets" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" - utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" "k8s.io/kubernetes/pkg/scheduler/schedulercache" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) // ActivePodsFunc is a function that returns a list of pods to reconcile. @@ -83,9 +83,9 @@ type ManagerImpl struct { allocatedDevices map[string]sets.String // podDevices contains pod to allocated device mapping. - podDevices podDevices - store utilstore.Store - pluginOpts map[string]*pluginapi.DevicePluginOptions + podDevices podDevices + pluginOpts map[string]*pluginapi.DevicePluginOptions + checkpointManager checkpointmanager.CheckpointManager } type sourcesReadyStub struct{} @@ -122,11 +122,11 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { // Before that, initializes them to perform no-op operations. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.sourcesReady = &sourcesReadyStub{} - var err error - manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{}) + checkpointManager, err := checkpointmanager.NewCheckpointManager(dir) if err != nil { - return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err) + return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err) } + manager.checkpointManager = checkpointManager return manager, nil } @@ -454,33 +454,19 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) return capacity, allocatable, deletedResources.UnsortedList() } -// checkpointData struct is used to store pod to device allocation information -// and registered device information in a checkpoint file. -// TODO: add version control when we need to change checkpoint format. -type checkpointData struct { - PodDeviceEntries []podDevicesCheckpointEntry - RegisteredDevices map[string][]string -} - // Checkpoints device to container allocation information to disk. func (m *ManagerImpl) writeCheckpoint() error { m.mutex.Lock() - data := checkpointData{ - PodDeviceEntries: m.podDevices.toCheckpointData(), - RegisteredDevices: make(map[string][]string), - } + registeredDevs := make(map[string][]string) for resource, devices := range m.healthyDevices { - data.RegisteredDevices[resource] = devices.UnsortedList() + registeredDevs[resource] = devices.UnsortedList() } + data := checkpoint.New(m.podDevices.toCheckpointData(), + registeredDevs) m.mutex.Unlock() - - dataJSON, err := json.Marshal(data) + err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data) if err != nil { - return err - } - err = m.store.Write(kubeletDeviceManagerCheckpoint, dataJSON) - if err != nil { - return fmt.Errorf("failed to write deviceplugin checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) + return fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) } return nil } @@ -488,24 +474,23 @@ func (m *ManagerImpl) writeCheckpoint() error { // Reads device to container allocation information from disk, and populates // m.allocatedDevices accordingly. func (m *ManagerImpl) readCheckpoint() error { - content, err := m.store.Read(kubeletDeviceManagerCheckpoint) + registeredDevs := make(map[string][]string) + devEntries := make([]checkpoint.PodDevicesEntry, 0) + cp := checkpoint.New(devEntries, registeredDevs) + err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp) if err != nil { - if err == utilstore.ErrKeyNotFound { + if err == errors.ErrCheckpointNotFound { + glog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err) return nil } - return fmt.Errorf("failed to read checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) + return err } - glog.V(4).Infof("Read checkpoint file %s\n", kubeletDeviceManagerCheckpoint) - var data checkpointData - if err := json.Unmarshal(content, &data); err != nil { - return fmt.Errorf("failed to unmarshal deviceplugin checkpoint data: %v", err) - } - m.mutex.Lock() defer m.mutex.Unlock() - m.podDevices.fromCheckpointData(data.PodDeviceEntries) + podDevices, registeredDevs := cp.GetData() + m.podDevices.fromCheckpointData(podDevices) m.allocatedDevices = m.podDevices.devices() - for resource := range data.RegisteredDevices { + for resource := range registeredDevs { // During start up, creates empty healthyDevices list so that the resource capacity // will stay zero till the corresponding device plugin re-registers. m.healthyDevices[resource] = sets.NewString() diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index b0ef2857fd1..66b63999ae3 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -34,10 +34,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/lifecycle" - utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" "k8s.io/kubernetes/pkg/scheduler/schedulercache" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) const ( @@ -347,20 +346,19 @@ func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.Cont func TestCheckpoint(t *testing.T) { resourceName1 := "domain1.com/resource1" resourceName2 := "domain2.com/resource2" - as := assert.New(t) tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) - defer os.RemoveAll(tmpDir) + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + as.Nil(err) testManager := &ManagerImpl{ - socketdir: tmpDir, - endpoints: make(map[string]endpoint), - healthyDevices: make(map[string]sets.String), - unhealthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + endpoints: make(map[string]endpoint), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + checkpointManager: ckm, } - testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) testManager.podDevices.insert("pod1", "con1", resourceName1, constructDevices([]string{"dev1", "dev2"}), @@ -479,21 +477,25 @@ func makePod(limits v1.ResourceList) *v1.Pod { } } -func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl { +func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) { monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} - testManager := &ManagerImpl{ - socketdir: tmpDir, - callback: monitorCallback, - healthyDevices: make(map[string]sets.String), - unhealthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - endpoints: make(map[string]endpoint), - pluginOpts: opts, - podDevices: make(podDevices), - activePods: activePods, - sourcesReady: &sourcesReadyStub{}, + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + if err != nil { + return nil, err + } + testManager := &ManagerImpl{ + socketdir: tmpDir, + callback: monitorCallback, + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + endpoints: make(map[string]endpoint), + pluginOpts: opts, + podDevices: make(podDevices), + activePods: activePods, + sourcesReady: &sourcesReadyStub{}, + checkpointManager: ckm, } - testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) for _, res := range testRes { testManager.healthyDevices[res.resourceName] = sets.NewString() for _, dev := range res.devs { @@ -525,7 +527,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso } } } - return testManager + return testManager, nil } func getTestNodeInfo(allocatable v1.ResourceList) *schedulercache.NodeInfo { @@ -569,7 +571,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { defer os.RemoveAll(tmpDir) nodeInfo := getTestNodeInfo(v1.ResourceList{}) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + as.Nil(err) testPods := []*v1.Pod{ makePod(v1.ResourceList{ @@ -664,7 +667,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) { as.Nil(err) defer os.RemoveAll(tmpDir) pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + as.Nil(err) podWithPluginResourcesInInitContainers := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -742,14 +746,18 @@ func TestSanitizeNodeAllocatable(t *testing.T) { as := assert.New(t) monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + tmpDir, err := ioutil.TempDir("", "checkpoint") + as.Nil(err) + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + as.Nil(err) testManager := &ManagerImpl{ - callback: monitorCallback, - healthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + callback: monitorCallback, + allDevices: make(map[string]sets.String), + healthyDevices: make(map[string]sets.String), + podDevices: make(podDevices), + checkpointManager: ckm, } - testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) // require one of resource1 and one of resource2 testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1].Insert(devID1) @@ -796,7 +804,8 @@ func TestDevicePreStartContainer(t *testing.T) { pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true} - testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) + as.Nil(err) ch := make(chan []string, 1) testManager.endpoints[res1.resourceName] = &MockEndpoint{ diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index eb20dc0a6e8..6dcb3a51a56 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -126,18 +127,9 @@ func (pdev podDevices) devices() map[string]sets.String { return ret } -// podDevicesCheckpointEntry is used to record to device allocation information. -type podDevicesCheckpointEntry struct { - PodUID string - ContainerName string - ResourceName string - DeviceIDs []string - AllocResp []byte -} - // Turns podDevices to checkpointData. -func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { - var data []podDevicesCheckpointEntry +func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { + var data []checkpoint.PodDevicesEntry for podUID, containerDevices := range pdev { for conName, resources := range containerDevices { for resource, devices := range resources { @@ -152,7 +144,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) continue } - data = append(data, podDevicesCheckpointEntry{podUID, conName, resource, devIds, allocResp}) + data = append(data, checkpoint.PodDevicesEntry{podUID, conName, resource, devIds, allocResp}) } } } @@ -160,7 +152,7 @@ func (pdev podDevices) toCheckpointData() []podDevicesCheckpointEntry { } // Populates podDevices from the passed in checkpointData. -func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) { +func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) { for _, entry := range data { glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index ccdc7ece9ea..fa8ab7d6a9c 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -82,6 +82,8 @@ go_library( "//pkg/credentialprovider:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim/cm:go_default_library", @@ -100,8 +102,6 @@ go_library( "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/kubelet/util/store:go_default_library", "//pkg/security/apparmor:go_default_library", - "//pkg/util/filesystem:go_default_library", - "//pkg/util/hash:go_default_library", "//pkg/util/parsers:go_default_library", "//vendor/github.com/armon/circbuf:go_default_library", "//vendor/github.com/blang/semver:go_default_library", @@ -149,6 +149,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library", @@ -186,7 +187,6 @@ filegroup( "//pkg/kubelet/dockershim/metrics:all-srcs", "//pkg/kubelet/dockershim/network:all-srcs", "//pkg/kubelet/dockershim/remote:all-srcs", - "//pkg/kubelet/dockershim/testing:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/kubelet/dockershim/convert.go b/pkg/kubelet/dockershim/convert.go index f1c9b0e852f..4f5621d24d9 100644 --- a/pkg/kubelet/dockershim/convert.go +++ b/pkg/kubelet/dockershim/convert.go @@ -164,13 +164,14 @@ func containerToRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSand }, nil } -func checkpointToRuntimeAPISandbox(id string, checkpoint *PodSandboxCheckpoint) *runtimeapi.PodSandbox { +func checkpointToRuntimeAPISandbox(id string, checkpoint DockershimCheckpoint) *runtimeapi.PodSandbox { state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY + _, name, namespace, _, _ := checkpoint.GetData() return &runtimeapi.PodSandbox{ Id: id, Metadata: &runtimeapi.PodSandboxMetadata{ - Name: checkpoint.Name, - Namespace: checkpoint.Namespace, + Name: name, + Namespace: namespace, }, State: state, } diff --git a/pkg/kubelet/dockershim/docker_checkpoint.go b/pkg/kubelet/dockershim/docker_checkpoint.go index f474696995b..8bfa1a77822 100644 --- a/pkg/kubelet/dockershim/docker_checkpoint.go +++ b/pkg/kubelet/dockershim/docker_checkpoint.go @@ -18,14 +18,9 @@ package dockershim import ( "encoding/json" - "fmt" - "hash/fnv" - "path/filepath" - "github.com/golang/glog" - utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" - utilfs "k8s.io/kubernetes/pkg/util/filesystem" - hashutil "k8s.io/kubernetes/pkg/util/hash" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) const ( @@ -36,6 +31,11 @@ const ( schemaVersion = "v1" ) +type DockershimCheckpoint interface { + checkpointmanager.Checkpoint + GetData() (string, string, string, []*PortMapping, bool) +} + type Protocol string // PortMapping is the port mapping configurations of a sandbox. @@ -65,89 +65,31 @@ type PodSandboxCheckpoint struct { // Data to checkpoint for pod sandbox. Data *CheckpointData `json:"data,omitempty"` // Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero - CheckSum uint64 `json:"checksum"` + Checksum checksum.Checksum `json:"checksum"` } -// CheckpointHandler provides the interface to manage PodSandbox checkpoint -type CheckpointHandler interface { - // CreateCheckpoint persists sandbox checkpoint in CheckpointStore. - CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error - // GetCheckpoint retrieves sandbox checkpoint from CheckpointStore. - GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) - // RemoveCheckpoint removes sandbox checkpoint form CheckpointStore. - // WARNING: RemoveCheckpoint will not return error if checkpoint does not exist. - RemoveCheckpoint(podSandboxID string) error - // ListCheckpoint returns the list of existing checkpoints. - ListCheckpoints() ([]string, error) -} - -// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore -type PersistentCheckpointHandler struct { - store utilstore.Store -} - -func NewPersistentCheckpointHandler(dockershimRootDir string) (CheckpointHandler, error) { - fstore, err := utilstore.NewFileStore(filepath.Join(dockershimRootDir, sandboxCheckpointDir), utilfs.DefaultFs{}) - if err != nil { - return nil, err - } - return &PersistentCheckpointHandler{store: fstore}, nil -} - -func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error { - checkpoint.CheckSum = calculateChecksum(*checkpoint) - blob, err := json.Marshal(checkpoint) - if err != nil { - return err - } - return handler.store.Write(podSandboxID, blob) -} - -func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) { - blob, err := handler.store.Read(podSandboxID) - if err != nil { - return nil, err - } - var checkpoint PodSandboxCheckpoint - //TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type. - err = json.Unmarshal(blob, &checkpoint) - if err != nil { - glog.Errorf("Failed to unmarshal checkpoint %q, removing checkpoint. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err) - handler.RemoveCheckpoint(podSandboxID) - return nil, fmt.Errorf("failed to unmarshal checkpoint") - } - if checkpoint.CheckSum != calculateChecksum(checkpoint) { - glog.Errorf("Checksum of checkpoint %q is not valid, removing checkpoint", podSandboxID) - handler.RemoveCheckpoint(podSandboxID) - return nil, fmt.Errorf("checkpoint is corrupted") - } - return &checkpoint, nil -} - -func (handler *PersistentCheckpointHandler) RemoveCheckpoint(podSandboxID string) error { - return handler.store.Delete(podSandboxID) -} - -func (handler *PersistentCheckpointHandler) ListCheckpoints() ([]string, error) { - keys, err := handler.store.List() - if err != nil { - return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err) - } - return keys, nil -} - -func NewPodSandboxCheckpoint(namespace, name string) *PodSandboxCheckpoint { +func NewPodSandboxCheckpoint(namespace, name string, data *CheckpointData) DockershimCheckpoint { return &PodSandboxCheckpoint{ Version: schemaVersion, Namespace: namespace, Name: name, - Data: &CheckpointData{}, + Data: data, } } -func calculateChecksum(checkpoint PodSandboxCheckpoint) uint64 { - checkpoint.CheckSum = 0 - hash := fnv.New32a() - hashutil.DeepHashObject(hash, checkpoint) - return uint64(hash.Sum32()) +func (cp *PodSandboxCheckpoint) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Data) + return json.Marshal(*cp) +} + +func (cp *PodSandboxCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +func (cp *PodSandboxCheckpoint) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Data) +} + +func (cp *PodSandboxCheckpoint) GetData() (string, string, string, []*PortMapping, bool) { + return cp.Version, cp.Name, cp.Namespace, cp.Data.PortMappings, cp.Data.HostNetwork } diff --git a/pkg/kubelet/dockershim/docker_checkpoint_test.go b/pkg/kubelet/dockershim/docker_checkpoint_test.go index c10b8f1e502..49f4e4d174a 100644 --- a/pkg/kubelet/dockershim/docker_checkpoint_test.go +++ b/pkg/kubelet/dockershim/docker_checkpoint_test.go @@ -17,86 +17,17 @@ limitations under the License. package dockershim import ( - "sort" "testing" "github.com/stretchr/testify/assert" - utilstore "k8s.io/kubernetes/pkg/kubelet/dockershim/testing" ) -func NewTestPersistentCheckpointHandler() CheckpointHandler { - return &PersistentCheckpointHandler{store: utilstore.NewMemStore()} -} - -func TestPersistentCheckpointHandler(t *testing.T) { - var err error - handler := NewTestPersistentCheckpointHandler() - port80 := int32(80) - port443 := int32(443) - proto := protocolTCP - - checkpoint1 := NewPodSandboxCheckpoint("ns1", "sandbox1") - checkpoint1.Data.PortMappings = []*PortMapping{ - { - &proto, - &port80, - &port80, - }, - { - &proto, - &port443, - &port443, - }, - } - checkpoint1.Data.HostNetwork = true - - checkpoints := []struct { - podSandboxID string - checkpoint *PodSandboxCheckpoint - expectHostNetwork bool - }{ - { - "id1", - checkpoint1, - true, - }, - { - "id2", - NewPodSandboxCheckpoint("ns2", "sandbox2"), - false, - }, - } - - for _, tc := range checkpoints { - // Test CreateCheckpoints - err = handler.CreateCheckpoint(tc.podSandboxID, tc.checkpoint) - assert.NoError(t, err) - - // Test GetCheckpoints - checkpoint, err := handler.GetCheckpoint(tc.podSandboxID) - assert.NoError(t, err) - assert.Equal(t, *checkpoint, *tc.checkpoint) - assert.Equal(t, checkpoint.Data.HostNetwork, tc.expectHostNetwork) - } - // Test ListCheckpoints - keys, err := handler.ListCheckpoints() - assert.NoError(t, err) - sort.Strings(keys) - assert.Equal(t, keys, []string{"id1", "id2"}) - - // Test RemoveCheckpoints - err = handler.RemoveCheckpoint("id1") - assert.NoError(t, err) - // Test Remove Nonexisted Checkpoints - err = handler.RemoveCheckpoint("id1") - assert.NoError(t, err) - - // Test ListCheckpoints - keys, err = handler.ListCheckpoints() - assert.NoError(t, err) - assert.Equal(t, keys, []string{"id2"}) - - // Test Get NonExisted Checkpoint - _, err = handler.GetCheckpoint("id1") - assert.Error(t, err) +func TestPodSandboxCheckpoint(t *testing.T) { + data := &CheckpointData{HostNetwork: true} + checkpoint := NewPodSandboxCheckpoint("ns1", "sandbox1", data) + version, name, namespace, _, hostNetwork := checkpoint.GetData() + assert.Equal(t, schemaVersion, version) + assert.Equal(t, "ns1", namespace) + assert.Equal(t, "sandbox1", name) + assert.Equal(t, true, hostNetwork) } diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 3e71d5506b4..a9bd36789b4 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -30,6 +30,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/qos" @@ -118,7 +119,7 @@ func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPod }(&err) // Step 3: Create Sandbox Checkpoint. - if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { + if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } @@ -189,7 +190,8 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP name = metadata.Name hostNetwork = (networkNamespaceMode(inspectResult) == runtimeapi.NamespaceMode_NODE) } else { - checkpoint, checkpointErr := ds.checkpointHandler.GetCheckpoint(podSandboxID) + checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{}) + checkpointErr := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint) // Proceed if both sandbox container and checkpoint could not be found. This means that following // actions will only have sandbox ID and not have pod namespace and name information. @@ -204,9 +206,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP fmt.Errorf("failed to get sandbox status: %v", statusErr)}) } } else { - namespace = checkpoint.Namespace - name = checkpoint.Name - hostNetwork = checkpoint.Data != nil && checkpoint.Data.HostNetwork + _, name, namespace, _, hostNetwork = checkpoint.GetData() } } @@ -237,7 +237,7 @@ func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopP errList = append(errList, err) } else { // remove the checkpoint for any sandbox that is not found in the runtime - ds.checkpointHandler.RemoveCheckpoint(podSandboxID) + ds.checkpointManager.RemoveCheckpoint(podSandboxID) } } @@ -284,7 +284,7 @@ func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.Rem } // Remove the checkpoint of the sandbox. - if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { + if err := ds.checkpointManager.RemoveCheckpoint(podSandboxID); err != nil { errs = append(errs, err) } if len(errs) == 0 { @@ -465,7 +465,7 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod var err error checkpoints := []string{} if filter == nil { - checkpoints, err = ds.checkpointHandler.ListCheckpoints() + checkpoints, err = ds.checkpointManager.ListCheckpoints() if err != nil { glog.Errorf("Failed to list checkpoints: %v", err) } @@ -501,7 +501,8 @@ func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPod if _, ok := sandboxIDs[id]; ok { continue } - checkpoint, err := ds.checkpointHandler.GetCheckpoint(id) + checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{}) + err := ds.checkpointManager.GetCheckpoint(id, checkpoint) if err != nil { glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err) continue @@ -624,20 +625,20 @@ func ipcNamespaceMode(container *dockertypes.ContainerJSON) runtimeapi.Namespace return runtimeapi.NamespaceMode_POD } -func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) *PodSandboxCheckpoint { - checkpoint := NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name) +func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) checkpointmanager.Checkpoint { + data := CheckpointData{} for _, pm := range config.GetPortMappings() { proto := toCheckpointProtocol(pm.Protocol) - checkpoint.Data.PortMappings = append(checkpoint.Data.PortMappings, &PortMapping{ + data.PortMappings = append(data.PortMappings, &PortMapping{ HostPort: &pm.HostPort, ContainerPort: &pm.ContainerPort, Protocol: &proto, }) } if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { - checkpoint.Data.HostNetwork = true + data.HostNetwork = true } - return checkpoint + return NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name, &data) } func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol { diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index fe8e84d1240..1b6f886d94a 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "path/filepath" "sync" "time" @@ -30,6 +31,7 @@ import ( "k8s.io/api/core/v1" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecm "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim/cm" @@ -191,7 +193,8 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon client := NewDockerClientFromConfig(config) c := libdocker.NewInstrumentedInterface(client) - checkpointHandler, err := NewPersistentCheckpointHandler(dockershimRootDir) + + checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir)) if err != nil { return nil, err } @@ -205,7 +208,7 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon execHandler: &NativeExecHandler{}, }, containerManager: cm.NewContainerManager(cgroupsName, client), - checkpointHandler: checkpointHandler, + checkpointManager: checkpointManager, disableSharedPID: disableSharedPID, networkReady: make(map[string]bool), } @@ -293,7 +296,7 @@ type dockerService struct { containerManager cm.ContainerManager // cgroup driver used by Docker runtime. cgroupDriver string - checkpointHandler CheckpointHandler + checkpointManager checkpointmanager.CheckpointManager // caches the version of the runtime. // To be compatible with multiple docker versions, we need to perform // version checking for some operations. Use this cache to avoid querying @@ -365,7 +368,8 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) { // GetPodPortMappings returns the port mappings of the given podSandbox ID. func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) { // TODO: get portmappings from docker labels for backward compatibility - checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) + checkpoint := NewPodSandboxCheckpoint("", "", &CheckpointData{}) + err := ds.checkpointManager.GetCheckpoint(podSandboxID, checkpoint) // Return empty portMappings if checkpoint is not found if err != nil { if err == utilstore.ErrKeyNotFound { @@ -373,9 +377,9 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po } return nil, err } - - portMappings := make([]*hostport.PortMapping, 0, len(checkpoint.Data.PortMappings)) - for _, pm := range checkpoint.Data.PortMappings { + _, _, _, checkpointedPortMappings, _ := checkpoint.GetData() + portMappings := make([]*hostport.PortMapping, 0, len(checkpointedPortMappings)) + for _, pm := range checkpointedPortMappings { proto := toAPIProtocol(*pm.Protocol) portMappings = append(portMappings, &hostport.PortMapping{ HostPort: *pm.HostPort, diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 0f9724e224d..2b223389205 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/network" @@ -43,15 +44,50 @@ func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin { return nettest.NewMockNetworkPlugin(ctrl) } +type mockCheckpointManager struct { + checkpoint map[string]*PodSandboxCheckpoint +} + +func (ckm *mockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + ckm.checkpoint[checkpointKey] = checkpoint.(*PodSandboxCheckpoint) + return nil +} + +func (ckm *mockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + *(checkpoint.(*PodSandboxCheckpoint)) = *(ckm.checkpoint[checkpointKey]) + return nil +} + +func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error { + _, ok := ckm.checkpoint[checkpointKey] + if ok { + delete(ckm.checkpoint, "moo") + } + return nil +} + +func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) { + var keys []string + for key, _ := range ckm.checkpoint { + keys = append(keys, key) + } + return keys, nil +} + +func newMockCheckpointManager() checkpointmanager.CheckpointManager { + return &mockCheckpointManager{checkpoint: make(map[string]*PodSandboxCheckpoint)} +} + func newTestDockerService() (*dockerService, *libdocker.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := libdocker.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23").WithRandSource(rand.NewSource(0)) pm := network.NewPluginManager(&network.NoopNetworkPlugin{}) + ckm := newMockCheckpointManager() return &dockerService{ client: c, os: &containertest.FakeOS{}, network: pm, - checkpointHandler: NewTestPersistentCheckpointHandler(), + checkpointManager: ckm, networkReady: make(map[string]bool), }, c, fakeClock } From cedbd932550d15a3e649ddb02ac74784ade32f06 Mon Sep 17 00:00:00 2001 From: vikaschoudhary16 Date: Wed, 29 Nov 2017 13:19:39 -0500 Subject: [PATCH 2/2] Make 'pod' package to use unified checkpointManager Signed-off-by: vikaschoudhary16 --- hack/.golint_failures | 2 +- pkg/kubelet/BUILD | 1 + pkg/kubelet/checkpoint/BUILD | 6 +- pkg/kubelet/checkpoint/checkpoint.go | 139 ++++++++---------- pkg/kubelet/checkpoint/checkpoint_test.go | 12 +- pkg/kubelet/checkpointmanager/BUILD | 2 +- pkg/kubelet/checkpointmanager/README.md | 3 +- .../checkpointmanager/checkpoint_manager.go | 4 + .../checkpoint_manager_test.go | 7 +- .../checkpointmanager/errors/errors.go | 7 +- pkg/kubelet/checkpointmanager/testing/util.go | 5 + pkg/kubelet/cm/devicemanager/manager_test.go | 4 +- pkg/kubelet/cm/devicemanager/pod_devices.go | 7 +- pkg/kubelet/config/BUILD | 1 + pkg/kubelet/config/config.go | 13 +- pkg/kubelet/dockershim/BUILD | 1 - pkg/kubelet/dockershim/docker_service_test.go | 2 +- pkg/kubelet/kubelet.go | 10 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/mountpod/mount_pod_test.go | 2 +- pkg/kubelet/pod/BUILD | 1 + pkg/kubelet/pod/pod_manager.go | 11 +- pkg/kubelet/pod/pod_manager_test.go | 2 +- pkg/kubelet/pod/testing/BUILD | 2 + pkg/kubelet/pod/testing/fake_mirror_client.go | 36 +++++ pkg/kubelet/prober/common_test.go | 2 +- pkg/kubelet/prober/worker_test.go | 2 +- pkg/kubelet/runonce_test.go | 2 +- pkg/kubelet/status/status_manager_test.go | 2 +- .../desired_state_of_world_populator_test.go | 2 +- .../volumemanager/volume_manager_test.go | 9 +- 31 files changed, 176 insertions(+), 125 deletions(-) diff --git a/hack/.golint_failures b/hack/.golint_failures index 824de2477d9..ef0253f94bb 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -168,6 +168,7 @@ pkg/kubelet/checkpointmanager/checksum pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1 pkg/kubelet/client pkg/kubelet/cm +pkg/kubelet/cm/devicemanager/checkpoint pkg/kubelet/cm/util pkg/kubelet/config pkg/kubelet/configmap @@ -184,7 +185,6 @@ pkg/kubelet/dockershim/network/hostport pkg/kubelet/dockershim/network/hostport/testing pkg/kubelet/dockershim/network/kubenet pkg/kubelet/dockershim/network/testing -pkg/kubelet/dockershim/testing pkg/kubelet/events pkg/kubelet/images pkg/kubelet/kuberuntime diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index b7657b78239..d2b6099c8f9 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -46,6 +46,7 @@ go_library( "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/certificate:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/configmap:go_default_library", diff --git a/pkg/kubelet/checkpoint/BUILD b/pkg/kubelet/checkpoint/BUILD index c04d198192c..0be2f14b41c 100644 --- a/pkg/kubelet/checkpoint/BUILD +++ b/pkg/kubelet/checkpoint/BUILD @@ -7,9 +7,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/core:go_default_library", - "//pkg/volume/util:go_default_library", - "//vendor/github.com/dchest/safefile:go_default_library", - "//vendor/github.com/ghodss/yaml:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", ], @@ -21,6 +20,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", ], diff --git a/pkg/kubelet/checkpoint/checkpoint.go b/pkg/kubelet/checkpoint/checkpoint.go index d4b30f902d9..e5801312d6a 100644 --- a/pkg/kubelet/checkpoint/checkpoint.go +++ b/pkg/kubelet/checkpoint/checkpoint.go @@ -17,20 +17,15 @@ limitations under the License. package checkpoint import ( + "encoding/json" "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "github.com/dchest/safefile" - "github.com/ghodss/yaml" "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) const ( @@ -39,54 +34,44 @@ const ( podPrefix = "Pod" ) -// Manager is the interface used to manage checkpoints -// which involves writing resources to disk to recover -// during restart or failure scenarios. -// https://github.com/kubernetes/community/pull/1241/files -type Manager interface { - // LoadPods will load checkpointed Pods from disk - LoadPods() ([]*v1.Pod, error) - - // WritePod will serialize a Pod to disk - WritePod(pod *v1.Pod) error - - // Deletes the checkpoint of the given pod from disk - DeletePod(pod *v1.Pod) error +type PodCheckpoint interface { + checkpointmanager.Checkpoint + GetPod() *v1.Pod } -var instance Manager -var mutex = &sync.Mutex{} - -// fileCheckPointManager - is a checkpointer that writes contents to disk -// The type information of the resource objects are encoded in the name -type fileCheckPointManager struct { - path string +// Data to be stored as checkpoint +type Data struct { + Pod *v1.Pod + Checksum checksum.Checksum } -// NewCheckpointManager will create a Manager that points to the following path -func NewCheckpointManager(path string) Manager { - // NOTE: This is a precaution; current implementation should not run - // multiple checkpoint managers. - mutex.Lock() - defer mutex.Unlock() - instance = &fileCheckPointManager{path: path} - return instance +// NewPodCheckpoint returns new pod checkpoint +func NewPodCheckpoint(pod *v1.Pod) PodCheckpoint { + return &Data{Pod: pod} } -// GetInstance will return the current Manager, there should be only one. -func GetInstance() Manager { - mutex.Lock() - defer mutex.Unlock() - return instance +// MarshalCheckpoint returns marshalled data +func (cp *Data) MarshalCheckpoint() ([]byte, error) { + cp.Checksum = checksum.New(*cp.Pod) + return json.Marshal(*cp) } -// loadPod will load Pod Checkpoint yaml file. -func (fcp *fileCheckPointManager) loadPod(file string) (*v1.Pod, error) { - return util.LoadPodFromFile(file) +// UnmarshalCheckpoint returns unmarshalled data +func (cp *Data) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cp *Data) VerifyChecksum() error { + return cp.Checksum.Verify(*cp.Pod) +} + +func (cp *Data) GetPod() *v1.Pod { + return cp.Pod } // checkAnnotations will validate the checkpoint annotations exist on the Pod -func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool { +func checkAnnotations(pod *v1.Pod) bool { if podAnnotations := pod.GetAnnotations(); podAnnotations != nil { if podAnnotations[core.BootstrapCheckpointAnnotationKey] == "true" { return true @@ -95,57 +80,49 @@ func (fcp *fileCheckPointManager) checkAnnotations(pod *v1.Pod) bool { return false } -// getPodPath returns the full qualified path for the pod checkpoint -func (fcp *fileCheckPointManager) getPodPath(pod *v1.Pod) string { - return fmt.Sprintf("%v/Pod%v%v.yaml", fcp.path, delimiter, pod.GetUID()) +//getPodKey returns the full qualified path for the pod checkpoint +func getPodKey(pod *v1.Pod) string { + return fmt.Sprintf("Pod%v%v.yaml", delimiter, pod.GetUID()) } // LoadPods Loads All Checkpoints from disk -func (fcp *fileCheckPointManager) LoadPods() ([]*v1.Pod, error) { - checkpoints := make([]*v1.Pod, 0) - files, err := ioutil.ReadDir(fcp.path) +func LoadPods(cpm checkpointmanager.CheckpointManager) ([]*v1.Pod, error) { + pods := make([]*v1.Pod, 0) + + var err error + checkpointKeys := []string{} + checkpointKeys, err = cpm.ListCheckpoints() if err != nil { - return nil, err + glog.Errorf("Failed to list checkpoints: %v", err) } - for _, f := range files { - // get just the filename - _, fname := filepath.Split(f.Name()) - // Get just the Resource from "Resource_Name" - fnfields := strings.Split(fname, delimiter) - switch fnfields[0] { - case podPrefix: - pod, err := fcp.loadPod(fmt.Sprintf("%s/%s", fcp.path, f.Name())) - if err != nil { - return nil, err - } - checkpoints = append(checkpoints, pod) - default: - glog.Warningf("Unsupported checkpoint file detected %v", f) + + for _, key := range checkpointKeys { + checkpoint := NewPodCheckpoint(nil) + err := cpm.GetCheckpoint(key, checkpoint) + if err != nil { + glog.Errorf("Failed to retrieve checkpoint for pod %q: %v", key, err) + continue } + pods = append(pods, checkpoint.GetPod()) } - return checkpoints, nil + return pods, nil } -// Writes a checkpoint to a file on disk if annotation is present -func (fcp *fileCheckPointManager) WritePod(pod *v1.Pod) error { +// WritePod a checkpoint to a file on disk if annotation is present +func WritePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error { var err error - if fcp.checkAnnotations(pod) { - if blob, err := yaml.Marshal(pod); err == nil { - err = safefile.WriteFile(fcp.getPodPath(pod), blob, 0644) - } + if checkAnnotations(pod) { + data := NewPodCheckpoint(pod) + err = cpm.CreateCheckpoint(getPodKey(pod), data) } else { // This is to handle an edge where a pod update could remove // an annotation and the checkpoint should then be removed. - err = fcp.DeletePod(pod) + err = cpm.RemoveCheckpoint(getPodKey(pod)) } return err } -// Deletes a checkpoint from disk if present -func (fcp *fileCheckPointManager) DeletePod(pod *v1.Pod) error { - podPath := fcp.getPodPath(pod) - if err := os.Remove(podPath); !os.IsNotExist(err) { - return err - } - return nil +// DeletePod deletes a checkpoint from disk if present +func DeletePod(cpm checkpointmanager.CheckpointManager, pod *v1.Pod) error { + return cpm.RemoveCheckpoint(getPodKey(pod)) } diff --git a/pkg/kubelet/checkpoint/checkpoint_test.go b/pkg/kubelet/checkpoint/checkpoint_test.go index 23c35c2c019..e93e61d4ee8 100644 --- a/pkg/kubelet/checkpoint/checkpoint_test.go +++ b/pkg/kubelet/checkpoint/checkpoint_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" ) // TestWriteLoadDeletePods validates all combinations of write, load, and delete @@ -70,15 +71,18 @@ func TestWriteLoadDeletePods(t *testing.T) { } defer os.RemoveAll(dir) - cp := NewCheckpointManager(dir) + cpm, err := checkpointmanager.NewCheckpointManager(dir) + if err != nil { + t.Errorf("Failed to initialize checkpoint manager error=%v", err) + } for _, p := range testPods { // Write pods should always pass unless there is an fs error - if err := cp.WritePod(p.pod); err != nil { + if err := WritePod(cpm, p.pod); err != nil { t.Errorf("Failed to Write Pod: %v", err) } } // verify the correct written files are loaded from disk - pods, err := cp.LoadPods() + pods, err := LoadPods(cpm) if err != nil { t.Errorf("Failed to Load Pods: %v", err) } @@ -104,7 +108,7 @@ func TestWriteLoadDeletePods(t *testing.T) { } else if lpod != nil { t.Errorf("Got unexpected result for %v, should not have been loaded", pname) } - err = cp.DeletePod(p.pod) + err = DeletePod(cpm, p.pod) if err != nil { t.Errorf("Failed to delete pod %v", pname) } diff --git a/pkg/kubelet/checkpointmanager/BUILD b/pkg/kubelet/checkpointmanager/BUILD index c27bb8f5898..0353c2d1ca8 100644 --- a/pkg/kubelet/checkpointmanager/BUILD +++ b/pkg/kubelet/checkpointmanager/BUILD @@ -22,7 +22,7 @@ go_test( srcs = ["checkpoint_manager_test.go"], embed = [":go_default_library"], deps = [ - "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/checkpointmanager/testing:go_default_library", "//pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/checkpointmanager/README.md b/pkg/kubelet/checkpointmanager/README.md index d5b6fd6eb74..07beef1300f 100644 --- a/pkg/kubelet/checkpointmanager/README.md +++ b/pkg/kubelet/checkpointmanager/README.md @@ -1,8 +1,9 @@ ## DISCLAIMER -Sig-Node community has reached a general consensus, as a best practice, to +- Sig-Node community has reached a general consensus, as a best practice, to avoid introducing any new checkpointing support. We reached this understanding after struggling with some hard-to-debug issues in the production environments caused by the checkpointing. +- Any changes to the checkpointed data structure would be considered incompatible and a component should add its own handling if it needs to ensure backward compatibility of reading old-format checkpoint files. ## Introduction This folder contains a framework & primitives, Checkpointing Manager, which is diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager.go b/pkg/kubelet/checkpointmanager/checkpoint_manager.go index 1d078357bff..f16e96044f2 100644 --- a/pkg/kubelet/checkpointmanager/checkpoint_manager.go +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager.go @@ -18,6 +18,7 @@ package checkpointmanager import ( "fmt" + "sync" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" @@ -48,8 +49,10 @@ type CheckpointManager interface { type impl struct { path string store utilstore.Store + mutex sync.Mutex } +// NewCheckpointManager returns a new instance of a checkpoint manager func NewCheckpointManager(checkpointDir string) (CheckpointManager, error) { fstore, err := utilstore.NewFileStore(checkpointDir, utilfs.DefaultFs{}) if err != nil { @@ -88,6 +91,7 @@ func (manager *impl) GetCheckpoint(checkpointKey string, checkpoint Checkpoint) return err } +// RemoveCheckpoint will not return error if checkpoint does not exist. func (manager *impl) RemoveCheckpoint(checkpointKey string) error { manager.mutex.Lock() defer manager.mutex.Unlock() diff --git a/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go index 4d02c4d5ac0..6081e90483a 100644 --- a/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go +++ b/pkg/kubelet/checkpointmanager/checkpoint_manager_test.go @@ -18,12 +18,11 @@ package checkpointmanager import ( "encoding/json" - "hash/fnv" "sort" "testing" "github.com/stretchr/testify/assert" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" utilstore "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/testing/example_checkpoint_formats/v1" ) @@ -50,7 +49,7 @@ type CheckpointDataV2 struct { type protocol string // portMapping is the port mapping configurations of a sandbox. -type portMapping struct { +type PortMapping struct { // protocol of the port mapping. Protocol *protocol // Port number within the container. @@ -153,7 +152,7 @@ func TestCheckpointManager(t *testing.T) { port443 := int32(443) proto := protocol("tcp") - portMappings := []*portMapping{ + portMappings := []*PortMapping{ { &proto, &port80, diff --git a/pkg/kubelet/checkpointmanager/errors/errors.go b/pkg/kubelet/checkpointmanager/errors/errors.go index 25462ffd9c8..bb445f207ec 100644 --- a/pkg/kubelet/checkpointmanager/errors/errors.go +++ b/pkg/kubelet/checkpointmanager/errors/errors.go @@ -18,5 +18,8 @@ package errors import "fmt" -var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.") -var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.") +// ErrCorruptCheckpoint error is reported when checksum does not match +var ErrCorruptCheckpoint = fmt.Errorf("checkpoint is corrupted") + +// ErrCheckpointNotFound is reported when checkpoint is not found for a given key +var ErrCheckpointNotFound = fmt.Errorf("checkpoint is not found") diff --git a/pkg/kubelet/checkpointmanager/testing/util.go b/pkg/kubelet/checkpointmanager/testing/util.go index 34bbb5303ef..8ca4af1d32f 100644 --- a/pkg/kubelet/checkpointmanager/testing/util.go +++ b/pkg/kubelet/checkpointmanager/testing/util.go @@ -27,10 +27,12 @@ type MemStore struct { sync.Mutex } +// NewMemStore returns an instance of MemStore func NewMemStore() *MemStore { return &MemStore{mem: make(map[string][]byte)} } +// Write writes the data to the store func (mstore *MemStore) Write(key string, data []byte) error { mstore.Lock() defer mstore.Unlock() @@ -38,6 +40,7 @@ func (mstore *MemStore) Write(key string, data []byte) error { return nil } +// Read returns data read from store func (mstore *MemStore) Read(key string) ([]byte, error) { mstore.Lock() defer mstore.Unlock() @@ -48,6 +51,7 @@ func (mstore *MemStore) Read(key string) ([]byte, error) { return data, nil } +// Delete deletes data from the store func (mstore *MemStore) Delete(key string) error { mstore.Lock() defer mstore.Unlock() @@ -55,6 +59,7 @@ func (mstore *MemStore) Delete(key string) error { return nil } +// List returns all the keys from the store func (mstore *MemStore) List() ([]string, error) { mstore.Lock() defer mstore.Unlock() diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 66b63999ae3..6c60e2c16b1 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -17,7 +17,6 @@ limitations under the License. package devicemanager import ( - "flag" "fmt" "io/ioutil" "os" @@ -548,7 +547,6 @@ type TestResource struct { } func TestPodContainerDeviceAllocation(t *testing.T) { - flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) res1 := TestResource{ resourceName: "domain1.com/resource1", resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), @@ -753,7 +751,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { as.Nil(err) testManager := &ManagerImpl{ callback: monitorCallback, - allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String), podDevices: make(podDevices), checkpointManager: ckm, diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 6dcb3a51a56..7df0e201c71 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -144,7 +144,12 @@ func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) continue } - data = append(data, checkpoint.PodDevicesEntry{podUID, conName, resource, devIds, allocResp}) + data = append(data, checkpoint.PodDevicesEntry{ + PodUID: podUID, + ContainerName: conName, + ResourceName: resource, + DeviceIDs: devIds, + AllocResp: allocResp}) } } } diff --git a/pkg/kubelet/config/BUILD b/pkg/kubelet/config/BUILD index 7dd54bc3f4b..37e72d75e2a 100644 --- a/pkg/kubelet/config/BUILD +++ b/pkg/kubelet/config/BUILD @@ -58,6 +58,7 @@ go_library( "//pkg/apis/core/v1:go_default_library", "//pkg/apis/core/validation:go_default_library", "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/types:go_default_library", diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index c03e32512af..6ffb448254b 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -64,7 +65,7 @@ type PodConfig struct { // contains the list of all configured sources sourcesLock sync.Mutex sources sets.String - checkpointManager checkpoint.Manager + checkpointManager checkpointmanager.CheckpointManager } // NewPodConfig creates an object that can merge many configuration sources into a stream @@ -114,10 +115,12 @@ func (c *PodConfig) Sync() { func (c *PodConfig) Restore(path string, updates chan<- interface{}) error { var err error if c.checkpointManager == nil { - c.checkpointManager = checkpoint.NewCheckpointManager(path) - pods, err := c.checkpointManager.LoadPods() - if err == nil { - updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} + c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path) + if err != nil { + pods, err := checkpoint.LoadPods(c.checkpointManager) + if err == nil { + updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} + } } } return err diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index fa8ab7d6a9c..ea7f430fdf5 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -155,7 +155,6 @@ go_test( "//pkg/kubelet/dockershim/libdocker:go_default_library", "//pkg/kubelet/dockershim/network:go_default_library", "//pkg/kubelet/dockershim/network/testing:go_default_library", - "//pkg/kubelet/dockershim/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/cache:go_default_library", "//pkg/security/apparmor:go_default_library", diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 2b223389205..cc42ef5cabb 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -68,7 +68,7 @@ func (ckm *mockCheckpointManager) RemoveCheckpoint(checkpointKey string) error { func (ckm *mockCheckpointManager) ListCheckpoints() ([]string, error) { var keys []string - for key, _ := range ckm.checkpoint { + for key := range ckm.checkpoint { keys = append(keys, key) } return keys, nil diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9a1755ffe7f..87b8befcf14 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -61,6 +61,7 @@ import ( kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/configmap" @@ -553,8 +554,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.livenessManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() + var checkpointManager checkpointmanager.CheckpointManager + if bootstrapCheckpointPath != "" { + checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err) + } + } // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. - klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager) + klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager) if remoteRuntimeEndpoint != "" { // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index be074af61a4..a3df9ae555f 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -215,7 +215,7 @@ func newTestKubeletWithImageList( kubelet.secretManager = secretManager configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) kubelet.configMapManager = configMapManager - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager) + kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager, kubelet.configMapManager, podtest.NewMockCheckpointManager()) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}) kubelet.containerRuntime = fakeRuntime diff --git a/pkg/kubelet/mountpod/mount_pod_test.go b/pkg/kubelet/mountpod/mount_pod_test.go index 3f84739fff9..bf7a0ec5602 100644 --- a/pkg/kubelet/mountpod/mount_pod_test.go +++ b/pkg/kubelet/mountpod/mount_pod_test.go @@ -52,7 +52,7 @@ func TestGetVolumeExec(t *testing.T) { fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager()) podManager.SetPods(pods) // Prepare fake /var/lib/kubelet diff --git a/pkg/kubelet/pod/BUILD b/pkg/kubelet/pod/BUILD index bba1a722c30..93e77dd955c 100644 --- a/pkg/kubelet/pod/BUILD +++ b/pkg/kubelet/pod/BUILD @@ -15,6 +15,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/pod", deps = [ "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/secret:go_default_library", diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index be15616a9ea..6033ae8d50a 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -24,6 +24,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/secret" @@ -121,18 +122,18 @@ type basicManager struct { // basicManager is keeping secretManager and configMapManager up-to-date. secretManager secret.Manager configMapManager configmap.Manager - checkpointManager checkpoint.Manager + checkpointManager checkpointmanager.CheckpointManager // A mirror pod client to create/delete mirror pods. MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager) Manager { +func NewBasicPodManager(client MirrorClient, secretManager secret.Manager, configMapManager configmap.Manager, cpm checkpointmanager.CheckpointManager) Manager { pm := &basicManager{} pm.secretManager = secretManager pm.configMapManager = configMapManager - pm.checkpointManager = checkpoint.GetInstance() + pm.checkpointManager = cpm pm.MirrorClient = client pm.SetPods(nil) return pm @@ -161,7 +162,7 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) { defer pm.lock.Unlock() pm.updatePodsInternal(pod) if pm.checkpointManager != nil { - if err := pm.checkpointManager.WritePod(pod); err != nil { + if err := checkpoint.WritePod(pm.checkpointManager, pod); err != nil { glog.Errorf("Error writing checkpoint for pod: %v", pod.GetName()) } } @@ -224,7 +225,7 @@ func (pm *basicManager) DeletePod(pod *v1.Pod) { delete(pm.podByFullName, podFullName) } if pm.checkpointManager != nil { - if err := pm.checkpointManager.DeletePod(pod); err != nil { + if err := checkpoint.DeletePod(pm.checkpointManager, pod); err != nil { glog.Errorf("Error deleting checkpoint for pod: %v", pod.GetName()) } } diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 557a5927f3e..0c24325bd6e 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -34,7 +34,7 @@ func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() secretManager := secret.NewFakeManager() configMapManager := configmap.NewFakeManager() - manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager).(*basicManager) + manager := NewBasicPodManager(fakeMirrorClient, secretManager, configMapManager, podtest.NewMockCheckpointManager()).(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/pod/testing/BUILD b/pkg/kubelet/pod/testing/BUILD index b0364a226fe..28daeb0e36d 100644 --- a/pkg/kubelet/pod/testing/BUILD +++ b/pkg/kubelet/pod/testing/BUILD @@ -13,6 +13,8 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/kubelet/pod/testing", deps = [ + "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor/github.com/stretchr/testify/mock:go_default_library", diff --git a/pkg/kubelet/pod/testing/fake_mirror_client.go b/pkg/kubelet/pod/testing/fake_mirror_client.go index 7a7dd6ae639..a36a80a2c94 100644 --- a/pkg/kubelet/pod/testing/fake_mirror_client.go +++ b/pkg/kubelet/pod/testing/fake_mirror_client.go @@ -21,6 +21,8 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + cp "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -81,3 +83,37 @@ func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) { defer fmc.mirrorPodLock.RUnlock() return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName] } + +type MockCheckpointManager struct { + checkpoint map[string]*cp.Data +} + +func (ckm *MockCheckpointManager) CreateCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + ckm.checkpoint[checkpointKey] = (checkpoint.(*cp.Data)) + return nil +} + +func (ckm *MockCheckpointManager) GetCheckpoint(checkpointKey string, checkpoint checkpointmanager.Checkpoint) error { + *(checkpoint.(*cp.Data)) = *(ckm.checkpoint[checkpointKey]) + return nil +} + +func (ckm *MockCheckpointManager) RemoveCheckpoint(checkpointKey string) error { + _, ok := ckm.checkpoint[checkpointKey] + if ok { + delete(ckm.checkpoint, "moo") + } + return nil +} + +func (ckm *MockCheckpointManager) ListCheckpoints() ([]string, error) { + var keys []string + for key := range ckm.checkpoint { + keys = append(keys, key) + } + return keys, nil +} + +func NewMockCheckpointManager() checkpointmanager.CheckpointManager { + return &MockCheckpointManager{checkpoint: make(map[string]*cp.Data)} +} diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index f19ecd97b06..74cd10b4bff 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -99,7 +99,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { func newTestManager() *manager { refManager := kubecontainer.NewRefManager() refManager.SetRef(testContainerID, &v1.ObjectReference{}) // Suppress prober warnings. - podManager := kubepod.NewBasicPodManager(nil, nil, nil) + podManager := kubepod.NewBasicPodManager(nil, nil, nil, nil) // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) m := NewManager( diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index f05723e1c84..a36c0afde60 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -118,7 +118,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil, nil, nil), &statustest.FakePodDeletionSafetyProvider{}) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 63b37ff2ab7..d8e943c1f75 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -63,7 +63,7 @@ func TestRunOnce(t *testing.T) { fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager()) fakeRuntime := &containertest.FakeRuntime{} basePath, err := utiltesting.MkTmpdir("kubelet") if err != nil { diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 6e68b20d5ed..84ddec36e3b 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -75,7 +75,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager()) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager(), kubeconfigmap.NewFakeManager(), podtest.NewMockCheckpointManager()) podManager.AddPod(getTestPod()) return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager) } diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 90c6bf182a5..1ecf9674dd8 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -523,7 +523,7 @@ func createDswpWithVolume(t *testing.T, pv *v1.PersistentVolume, pvc *v1.Persist fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() fakePodManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager) + podtest.NewFakeMirrorClient(), fakeSecretManager, fakeConfigMapManager, podtest.NewMockCheckpointManager()) fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr) fakeASW := cache.NewActualStateOfWorld("fake", fakeVolumePluginMgr) diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 6e9b01fdb03..0d5af922627 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -55,7 +55,8 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + cpm := podtest.NewMockCheckpointManager() + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm) node, pod, pv, claim := createObjects() kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -97,7 +98,8 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + cpm := podtest.NewMockCheckpointManager() + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm) node, pod, pv, claim := createObjects() claim.Status = v1.PersistentVolumeClaimStatus{ @@ -135,7 +137,8 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + cpm := podtest.NewMockCheckpointManager() + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager(), cpm) node, pod, _, claim := createObjects()