From 51526d310385d6f3791db8bd00ed16abab46658f Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 11 Jan 2017 15:40:01 -0800 Subject: [PATCH] Add checkpointHandler to DockerService --- hack/verify-flags/exceptions.txt | 1 + pkg/kubelet/dockershim/BUILD | 8 ++ pkg/kubelet/dockershim/checkpoint_store.go | 6 +- .../dockershim/checkpoint_store_test.go | 7 +- pkg/kubelet/dockershim/convert.go | 14 ++- pkg/kubelet/dockershim/docker_checkpoint.go | 12 +- .../dockershim/docker_checkpoint_test.go | 1 - pkg/kubelet/dockershim/docker_sandbox.go | 115 ++++++++++++++++-- pkg/kubelet/dockershim/docker_service.go | 9 +- pkg/kubelet/dockershim/docker_service_test.go | 2 +- pkg/kubelet/dockershim/testing/BUILD | 27 ++++ pkg/kubelet/network/kubenet/kubenet_linux.go | 2 +- 12 files changed, 171 insertions(+), 33 deletions(-) create mode 100644 pkg/kubelet/dockershim/testing/BUILD diff --git a/hack/verify-flags/exceptions.txt b/hack/verify-flags/exceptions.txt index 12be24fe8f1..600b2ed24d2 100644 --- a/hack/verify-flags/exceptions.txt +++ b/hack/verify-flags/exceptions.txt @@ -120,6 +120,7 @@ pkg/kubelet/api/v1alpha1/runtime/api.proto: int64 oom_score_adj = 5; pkg/kubelet/api/v1alpha1/runtime/api.proto: string pod_cidr = 1; pkg/kubelet/cm/container_manager_linux.go: glog.V(3).Infof("Failed to apply oom_score_adj %d for pid %d: %v", oomScoreAdj, pid, err) pkg/kubelet/cm/container_manager_linux.go: glog.V(5).Infof("attempting to apply oom_score_adj of %d to pid %d", oomScoreAdj, pid) +pkg/kubelet/dockershim/docker_checkpoint.go: ContainerPort *int32 `json:"container_port,omitempty"` pkg/kubelet/network/hairpin/hairpin.go: hairpinModeRelativePath = "hairpin_mode" pkg/kubelet/qos/policy_test.go: t.Errorf("oom_score_adj should be between %d and %d, but was %d", test.lowOOMScoreAdj, test.highOOMScoreAdj, oomScoreAdj) pkg/kubelet/qos/policy_test.go: highOOMScoreAdj int // The min oom_score_adj score the container should be assigned. diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 77ac8e9fde5..e765e3d8cf7 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -11,8 +11,10 @@ load( go_library( name = "go_default_library", srcs = [ + "checkpoint_store.go", "convert.go", "doc.go", + "docker_checkpoint.go", "docker_container.go", "docker_image.go", "docker_sandbox.go", @@ -41,6 +43,7 @@ go_library( "//pkg/kubelet/server/streaming:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/ioutils:go_default_library", + "//pkg/util/hash:go_default_library", "//pkg/util/term:go_default_library", "//vendor:github.com/docker/engine-api/types", "//vendor:github.com/docker/engine-api/types/container", @@ -49,13 +52,16 @@ go_library( "//vendor:github.com/docker/engine-api/types/versions", "//vendor:github.com/docker/go-connections/nat", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/util/errors", ], ) go_test( name = "go_default_test", srcs = [ + "checkpoint_store_test.go", "convert_test.go", + "docker_checkpoint_test.go", "docker_container_test.go", "docker_image_test.go", "docker_sandbox_test.go", @@ -71,6 +77,7 @@ go_test( "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/dockershim/testing:go_default_library", "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools/securitycontext:go_default_library", "//pkg/kubelet/network:go_default_library", @@ -99,6 +106,7 @@ filegroup( ":package-srcs", "//pkg/kubelet/dockershim/cm:all-srcs", "//pkg/kubelet/dockershim/remote:all-srcs", + "//pkg/kubelet/dockershim/testing:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/kubelet/dockershim/checkpoint_store.go b/pkg/kubelet/dockershim/checkpoint_store.go index 642bd1889c4..32215174906 100644 --- a/pkg/kubelet/dockershim/checkpoint_store.go +++ b/pkg/kubelet/dockershim/checkpoint_store.go @@ -48,9 +48,9 @@ type CheckpointStore interface { List() ([]string, error) } -// FileStore is an implementation of CheckpointStore interface which stores checkpoint in file. +// FileStore is an implementation of CheckpointStore interface which stores checkpoint in files. type FileStore struct { - // path to the base directory for storing checkpoints + // path to the base directory for storing checkpoint files path string } @@ -95,7 +95,7 @@ func (fstore *FileStore) List() ([]string, error) { return keys, err } for _, f := range files { - if !strings.HasSuffix(f.Name(), tmpSuffix) { + if !strings.HasPrefix(f.Name(), tmpPrefix) { keys = append(keys, f.Name()) } } diff --git a/pkg/kubelet/dockershim/checkpoint_store_test.go b/pkg/kubelet/dockershim/checkpoint_store_test.go index 188cad82ef7..44731bc26c8 100644 --- a/pkg/kubelet/dockershim/checkpoint_store_test.go +++ b/pkg/kubelet/dockershim/checkpoint_store_test.go @@ -25,10 +25,6 @@ import ( "github.com/stretchr/testify/assert" ) -const ( - testPath = "/tmp/testFileStore" -) - func TestFileStore(t *testing.T) { path, err := ioutil.TempDir("", "FileStore") assert.NoError(t, err) @@ -154,8 +150,7 @@ func TestIsValidKey(t *testing.T) { func cleanUpTestPath(t *testing.T, path string) { if _, err := os.Stat(path); !os.IsNotExist(err) { if err := os.RemoveAll(path); err != nil { - t.Errorf("Failed to delete test directory: %v", err) + assert.NoError(t, err, "Failed to delete test directory: %v", err) } } - return } diff --git a/pkg/kubelet/dockershim/convert.go b/pkg/kubelet/dockershim/convert.go index c8de3df417a..e77491aefd5 100644 --- a/pkg/kubelet/dockershim/convert.go +++ b/pkg/kubelet/dockershim/convert.go @@ -148,7 +148,7 @@ func toRuntimeAPISandboxState(state string) runtimeapi.PodSandboxState { } } -func toRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSandbox, error) { +func containerToRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSandbox, error) { state := toRuntimeAPISandboxState(c.Status) if len(c.Names) == 0 { return nil, fmt.Errorf("unexpected empty sandbox name: %+v", c) @@ -169,3 +169,15 @@ func toRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSandbox, erro Annotations: annotations, }, nil } + +func checkpointToRuntimeAPISandbox(id string, checkpoint *PodSandboxCheckpoint) *runtimeapi.PodSandbox { + state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY + return &runtimeapi.PodSandbox{ + Id: id, + Metadata: &runtimeapi.PodSandboxMetadata{ + Name: checkpoint.Name, + Namespace: checkpoint.Namespace, + }, + State: state, + } +} diff --git a/pkg/kubelet/dockershim/docker_checkpoint.go b/pkg/kubelet/dockershim/docker_checkpoint.go index dfb536a414a..16f95f06259 100644 --- a/pkg/kubelet/dockershim/docker_checkpoint.go +++ b/pkg/kubelet/dockershim/docker_checkpoint.go @@ -19,20 +19,22 @@ package dockershim import ( "encoding/json" "fmt" - "github.com/golang/glog" "hash/fnv" + "path/filepath" + + "github.com/golang/glog" hashutil "k8s.io/kubernetes/pkg/util/hash" ) const ( // default directory to store pod sandbox checkpoint files - sandboxCheckpointDir = "/var/lib/dockershim/sandbox" + sandboxCheckpointDir = "sandbox" protocolTCP = Protocol("tcp") protocolUDP = Protocol("udp") schemaVersion = "v1" ) -var CorruptCheckpointError = fmt.Errorf("Checkpoint is corrupted.") +var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.") type Protocol string @@ -84,7 +86,7 @@ type PersistentCheckpointHandler struct { } func NewPersistentCheckpointHandler() CheckpointHandler { - return &PersistentCheckpointHandler{store: &FileStore{path: sandboxCheckpointDir}} + return &PersistentCheckpointHandler{store: &FileStore{path: filepath.Join(dockershimRootDir, sandboxCheckpointDir)}} } func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error { @@ -105,7 +107,7 @@ func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) ( //TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type. err = json.Unmarshal(blob, &checkpoint) if err != nil { - glog.Errorf("Failed to unmarshal checkpoint %q: %v", podSandboxID, err) + glog.Errorf("Failed to unmarshal checkpoint %q. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err) return &checkpoint, CorruptCheckpointError } if checkpoint.CheckSum != calculateChecksum(checkpoint) { diff --git a/pkg/kubelet/dockershim/docker_checkpoint_test.go b/pkg/kubelet/dockershim/docker_checkpoint_test.go index c5d428329d3..477c9204d90 100644 --- a/pkg/kubelet/dockershim/docker_checkpoint_test.go +++ b/pkg/kubelet/dockershim/docker_checkpoint_test.go @@ -73,7 +73,6 @@ func TestPersistentCheckpointHandler(t *testing.T) { assert.NoError(t, err) assert.Equal(t, *checkpoint, *tc.checkpoint) } - // Test ListCheckpoints keys, err := handler.ListCheckpoints() assert.NoError(t, err) diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index def3268883f..535ed569731 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -24,6 +24,7 @@ import ( dockerfilters "github.com/docker/engine-api/types/filters" "github.com/golang/glog" + utilerrors "k8s.io/apimachinery/pkg/util/errors" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" @@ -77,7 +78,12 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str return "", fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } - // Step 3: Start the sandbox container. + // Step 3: Create Sandbox Checkpoint. + if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { + return createResp.ID, err + } + + // Step 4: Start the sandbox container. // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) @@ -88,7 +94,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str return createResp.ID, nil } - // Step 4: Setup networking for the sandbox. + // Step 5: Setup networking for the sandbox. // All pod networking is setup by a CNI plugin discovered at startup time. // This plugin assigns the pod ip, sets up routes inside the sandbox, // creates interfaces etc. In theory, its jurisdiction ends with pod @@ -107,30 +113,61 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str // better to cut our losses assuming an out of band GC routine will cleanup // after us? func (ds *dockerService) StopPodSandbox(podSandboxID string) error { - status, err := ds.PodSandboxStatus(podSandboxID) - if err != nil { - return fmt.Errorf("Failed to get sandbox status: %v", err) - } - if nsOpts := status.GetLinux().GetNamespaces().GetOptions(); nsOpts != nil && !nsOpts.HostNetwork { + var namespace, name string + needNetworkTearDown := false + + status, statusErr := ds.PodSandboxStatus(podSandboxID) + if statusErr == nil { + nsOpts := status.GetLinux().GetNamespaces().GetOptions() + needNetworkTearDown = nsOpts != nil && !nsOpts.HostNetwork m := status.GetMetadata() + namespace = m.Namespace + name = m.Name + } else { + checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) + if err != nil { + glog.Errorf("Failed to get checkpoint for sandbox %q: %v", podSandboxID, err) + return fmt.Errorf("failed to get sandbox status: %v", statusErr) + } + namespace = checkpoint.Namespace + name = checkpoint.Name + // Always trigger network plugin to tear down + needNetworkTearDown = true + } + + if needNetworkTearDown { cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID) - if err := ds.networkPlugin.TearDownPod(m.Namespace, m.Name, cID); err != nil { + if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil { // TODO: Figure out a way to retry this error. We can't // right now because the plugin throws errors when it doesn't find // eth0, which might not exist for various reasons (setup failed, // conf changed etc). In theory, it should teardown everything else // so there's no need to retry. - glog.Errorf("Failed to teardown sandbox %v for pod %v/%v: %v", m.Namespace, m.Name, podSandboxID, err) + glog.Errorf("Failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err) } } - return ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod) + if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil { + glog.Errorf("Failed to stop sandbox %q: %v", podSandboxID, err) + // Do not return error if the container does not exist + if !dockertools.IsContainerNotFoundError(err) { + return err + } + } + return nil // TODO: Stop all running containers in the sandbox. } // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { - return ds.client.RemoveContainer(podSandboxID, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) + var errs []error + if err := ds.client.RemoveContainer(podSandboxID, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}); err != nil && !dockertools.IsContainerNotFoundError(err) { + errs = append(errs, err) + } + if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { + errs = append(errs, err) + } + return utilerrors.NewAggregate(errs) // TODO: remove all containers in the sandbox. } @@ -275,9 +312,11 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([] // Convert docker containers to runtime api sandboxes. result := []*runtimeapi.PodSandbox{} + // using map as set + sandboxIDs := make(map[string]bool) for i := range containers { c := containers[i] - converted, err := toRuntimeAPISandbox(&c) + converted, err := containerToRuntimeAPISandbox(&c) if err != nil { glog.V(4).Infof("Unable to convert docker to runtime API sandbox: %v", err) continue @@ -285,9 +324,35 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([] if filterOutReadySandboxes && converted.State == runtimeapi.PodSandboxState_SANDBOX_READY { continue } - + sandboxIDs[converted.Id] = true result = append(result, converted) } + + // Include sandbox that could only be found with its checkpoint if no filter is applied + // These PodSandbox will only include PodSandboxID, Name, Namespace. + // These PodSandbox will be in PodSandboxState_SANDBOX_NOTREADY state. + if filter == nil { + checkpoints, err := ds.checkpointHandler.ListCheckpoints() + if err != nil { + glog.Errorf("Failed to list checkpoints: %v", err) + } + for _, id := range checkpoints { + if _, ok := sandboxIDs[id]; ok { + continue + } + checkpoint, err := ds.checkpointHandler.GetCheckpoint(id) + if err != nil { + glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err) + + if err == CorruptCheckpointError { + glog.V(2).Info("Removing corrupted checkpoint %q: %+v", id, *checkpoint) + ds.checkpointHandler.RemoveCheckpoint(id) + } + continue + } + result = append(result, checkpointToRuntimeAPISandbox(id, checkpoint)) + } + } return result, nil } @@ -383,3 +448,27 @@ func setSandboxResources(hc *dockercontainer.HostConfig) { // TODO: Get rid of the dependency on kubelet internal package. hc.OomScoreAdj = qos.PodInfraOOMAdj } + +func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) *PodSandboxCheckpoint { + checkpoint := NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name) + for _, pm := range config.GetPortMappings() { + proto := toCheckpointProtocol(pm.Protocol) + checkpoint.Data.PortMappings = append(checkpoint.Data.PortMappings, &PortMapping{ + HostPort: &pm.HostPort, + ContainerPort: &pm.ContainerPort, + Protocol: &proto, + }) + } + return checkpoint +} + +func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol { + switch protocol { + case runtimeapi.Protocol_TCP: + return protocolTCP + case runtimeapi.Protocol_UDP: + return protocolUDP + } + glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol) + return protocolTCP +} diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index a75b883a7cd..5a3650a5391 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -48,6 +48,9 @@ const ( defaultSeccompProfile = "unconfined" + // dockershimRootDir is the root directory for dockershim + dockershimRootDir = "/var/lib/dockershim" + // Internal docker labels used to identify whether a container is a sandbox // or a regular container. // TODO: This is not backward compatible with older containers. We will @@ -112,7 +115,8 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str client: client, execHandler: execHandler, }, - containerManager: cm.NewContainerManager(cgroupsName, client), + containerManager: cm.NewContainerManager(cgroupsName, client), + checkpointHandler: NewPersistentCheckpointHandler(), } if streamingConfig != nil { var err error @@ -173,7 +177,8 @@ type dockerService struct { networkPlugin network.NetworkPlugin containerManager cm.ContainerManager // cgroup driver used by Docker runtime. - cgroupDriver string + cgroupDriver string + checkpointHandler CheckpointHandler } // Version returns the runtime name, runtime version and runtime API version diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index fa6401b57f4..8255a5a16eb 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -41,7 +41,7 @@ func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin { func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := dockertools.NewFakeDockerClient().WithClock(fakeClock) - return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}}, c, fakeClock + return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}, checkpointHandler: NewTestPersistentCheckpointHandler()}, c, fakeClock } // TestStatus tests the runtime status logic. diff --git a/pkg/kubelet/dockershim/testing/BUILD b/pkg/kubelet/dockershim/testing/BUILD new file mode 100644 index 00000000000..5eb8bcd4466 --- /dev/null +++ b/pkg/kubelet/dockershim/testing/BUILD @@ -0,0 +1,27 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["util.go"], + tags = ["automanaged"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 97a8752b1dd..5cab15784b8 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -727,7 +727,7 @@ func podIsExited(p *kubecontainer.Pod) bool { func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { - return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) + glog.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } return &libcni.RuntimeConf{