diff --git a/hack/verify-flags/exceptions.txt b/hack/verify-flags/exceptions.txt index 12be24fe8f1..600b2ed24d2 100644 --- a/hack/verify-flags/exceptions.txt +++ b/hack/verify-flags/exceptions.txt @@ -120,6 +120,7 @@ pkg/kubelet/api/v1alpha1/runtime/api.proto: int64 oom_score_adj = 5; pkg/kubelet/api/v1alpha1/runtime/api.proto: string pod_cidr = 1; pkg/kubelet/cm/container_manager_linux.go: glog.V(3).Infof("Failed to apply oom_score_adj %d for pid %d: %v", oomScoreAdj, pid, err) pkg/kubelet/cm/container_manager_linux.go: glog.V(5).Infof("attempting to apply oom_score_adj of %d to pid %d", oomScoreAdj, pid) +pkg/kubelet/dockershim/docker_checkpoint.go: ContainerPort *int32 `json:"container_port,omitempty"` pkg/kubelet/network/hairpin/hairpin.go: hairpinModeRelativePath = "hairpin_mode" pkg/kubelet/qos/policy_test.go: t.Errorf("oom_score_adj should be between %d and %d, but was %d", test.lowOOMScoreAdj, test.highOOMScoreAdj, oomScoreAdj) pkg/kubelet/qos/policy_test.go: highOOMScoreAdj int // The min oom_score_adj score the container should be assigned. diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 77ac8e9fde5..e765e3d8cf7 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -11,8 +11,10 @@ load( go_library( name = "go_default_library", srcs = [ + "checkpoint_store.go", "convert.go", "doc.go", + "docker_checkpoint.go", "docker_container.go", "docker_image.go", "docker_sandbox.go", @@ -41,6 +43,7 @@ go_library( "//pkg/kubelet/server/streaming:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/ioutils:go_default_library", + "//pkg/util/hash:go_default_library", "//pkg/util/term:go_default_library", "//vendor:github.com/docker/engine-api/types", "//vendor:github.com/docker/engine-api/types/container", @@ -49,13 +52,16 @@ go_library( "//vendor:github.com/docker/engine-api/types/versions", "//vendor:github.com/docker/go-connections/nat", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/util/errors", ], ) go_test( name = "go_default_test", srcs = [ + "checkpoint_store_test.go", "convert_test.go", + "docker_checkpoint_test.go", "docker_container_test.go", "docker_image_test.go", "docker_sandbox_test.go", @@ -71,6 +77,7 @@ go_test( "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/dockershim/testing:go_default_library", "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools/securitycontext:go_default_library", "//pkg/kubelet/network:go_default_library", @@ -99,6 +106,7 @@ filegroup( ":package-srcs", "//pkg/kubelet/dockershim/cm:all-srcs", "//pkg/kubelet/dockershim/remote:all-srcs", + "//pkg/kubelet/dockershim/testing:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/kubelet/dockershim/checkpoint_store.go b/pkg/kubelet/dockershim/checkpoint_store.go new file mode 100644 index 00000000000..32215174906 --- /dev/null +++ b/pkg/kubelet/dockershim/checkpoint_store.go @@ -0,0 +1,114 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "strings" +) + +const ( + tmpPrefix = "." + tmpSuffix = ".tmp" + keyMaxLength = 250 +) + +var keyRegex = regexp.MustCompile("^[a-zA-Z0-9]+$") + +// CheckpointStore provides the interface for checkpoint storage backend. +// CheckpointStore must be thread-safe +type CheckpointStore interface { + // key must contain one or more characters in [A-Za-z0-9] + // Write persists a checkpoint with key + Write(key string, data []byte) error + // Read retrieves a checkpoint with key + Read(key string) ([]byte, error) + // Delete deletes a checkpoint with key + // Delete must not return error if checkpoint does not exist + Delete(key string) error + // List lists all keys of existing checkpoints + List() ([]string, error) +} + +// FileStore is an implementation of CheckpointStore interface which stores checkpoint in files. +type FileStore struct { + // path to the base directory for storing checkpoint files + path string +} + +func (fstore *FileStore) Write(key string, data []byte) error { + if err := validateKey(key); err != nil { + return err + } + if _, err := os.Stat(fstore.path); err != nil { + // if directory already exists, proceed + if err = os.MkdirAll(fstore.path, 0755); err != nil && !os.IsExist(err) { + return err + } + } + tmpfile := filepath.Join(fstore.path, fmt.Sprintf("%s%s%s", tmpPrefix, key, tmpSuffix)) + if err := ioutil.WriteFile(tmpfile, data, 0644); err != nil { + return err + } + return os.Rename(tmpfile, fstore.getCheckpointPath(key)) +} + +func (fstore *FileStore) Read(key string) ([]byte, error) { + if err := validateKey(key); err != nil { + return nil, err + } + return ioutil.ReadFile(fstore.getCheckpointPath(key)) +} + +func (fstore *FileStore) Delete(key string) error { + if err := validateKey(key); err != nil { + return err + } + if err := os.Remove(fstore.getCheckpointPath(key)); err != nil && !os.IsNotExist(err) { + return err + } + return nil +} + +func (fstore *FileStore) List() ([]string, error) { + keys := make([]string, 0) + files, err := ioutil.ReadDir(fstore.path) + if err != nil { + return keys, err + } + for _, f := range files { + if !strings.HasPrefix(f.Name(), tmpPrefix) { + keys = append(keys, f.Name()) + } + } + return keys, nil +} + +func (fstore *FileStore) getCheckpointPath(key string) string { + return filepath.Join(fstore.path, key) +} + +func validateKey(key string) error { + if len(key) <= keyMaxLength && keyRegex.MatchString(key) { + return nil + } + return fmt.Errorf("checkpoint key %q is not valid.", key) +} diff --git a/pkg/kubelet/dockershim/checkpoint_store_test.go b/pkg/kubelet/dockershim/checkpoint_store_test.go new file mode 100644 index 00000000000..44731bc26c8 --- /dev/null +++ b/pkg/kubelet/dockershim/checkpoint_store_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "io/ioutil" + "os" + "sort" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFileStore(t *testing.T) { + path, err := ioutil.TempDir("", "FileStore") + assert.NoError(t, err) + defer cleanUpTestPath(t, path) + store := &FileStore{path: path} + + Checkpoints := []struct { + key string + data string + expectErr bool + }{ + { + "id1", + "data1", + false, + }, + { + "id2", + "data2", + false, + }, + { + "/id1", + "data1", + true, + }, + { + ".id1", + "data1", + true, + }, + { + " ", + "data2", + true, + }, + { + "___", + "data2", + true, + }, + } + + // Test Add Checkpoint + for _, c := range Checkpoints { + _, err = store.Read(c.key) + assert.Error(t, err) + + err = store.Write(c.key, []byte(c.data)) + if c.expectErr { + assert.Error(t, err) + continue + } else { + assert.NoError(t, err) + } + + // Test Read Checkpoint + data, err := store.Read(c.key) + assert.NoError(t, err) + assert.Equal(t, string(data), c.data) + } + + // Test list checkpoints. + keys, err := store.List() + assert.NoError(t, err) + sort.Strings(keys) + assert.Equal(t, keys, []string{"id1", "id2"}) + + // Test Delete Checkpoint + for _, c := range Checkpoints { + if c.expectErr { + continue + } + + err = store.Delete(c.key) + assert.NoError(t, err) + _, err = store.Read(c.key) + assert.Error(t, err) + } + + // Test delete non existed checkpoint + err = store.Delete("id1") + assert.NoError(t, err) + + // Test list checkpoints. + keys, err = store.List() + assert.NoError(t, err) + assert.Equal(t, len(keys), 0) +} + +func TestIsValidKey(t *testing.T) { + testcases := []struct { + key string + valid bool + }{ + { + " ", + false, + }, + { + "/foo/bar", + false, + }, + { + ".foo", + false, + }, + { + "a78768279290d33d0b82eaea43cb8346f500057cb5bd250e88c97a5585385d66", + true, + }, + } + + for _, tc := range testcases { + if tc.valid { + assert.NoError(t, validateKey(tc.key)) + } else { + assert.Error(t, validateKey(tc.key)) + } + } +} + +func cleanUpTestPath(t *testing.T, path string) { + if _, err := os.Stat(path); !os.IsNotExist(err) { + if err := os.RemoveAll(path); err != nil { + assert.NoError(t, err, "Failed to delete test directory: %v", err) + } + } +} diff --git a/pkg/kubelet/dockershim/convert.go b/pkg/kubelet/dockershim/convert.go index c8de3df417a..e77491aefd5 100644 --- a/pkg/kubelet/dockershim/convert.go +++ b/pkg/kubelet/dockershim/convert.go @@ -148,7 +148,7 @@ func toRuntimeAPISandboxState(state string) runtimeapi.PodSandboxState { } } -func toRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSandbox, error) { +func containerToRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSandbox, error) { state := toRuntimeAPISandboxState(c.Status) if len(c.Names) == 0 { return nil, fmt.Errorf("unexpected empty sandbox name: %+v", c) @@ -169,3 +169,15 @@ func toRuntimeAPISandbox(c *dockertypes.Container) (*runtimeapi.PodSandbox, erro Annotations: annotations, }, nil } + +func checkpointToRuntimeAPISandbox(id string, checkpoint *PodSandboxCheckpoint) *runtimeapi.PodSandbox { + state := runtimeapi.PodSandboxState_SANDBOX_NOTREADY + return &runtimeapi.PodSandbox{ + Id: id, + Metadata: &runtimeapi.PodSandboxMetadata{ + Name: checkpoint.Name, + Namespace: checkpoint.Namespace, + }, + State: state, + } +} diff --git a/pkg/kubelet/dockershim/docker_checkpoint.go b/pkg/kubelet/dockershim/docker_checkpoint.go new file mode 100644 index 00000000000..16f95f06259 --- /dev/null +++ b/pkg/kubelet/dockershim/docker_checkpoint.go @@ -0,0 +1,146 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "encoding/json" + "fmt" + "hash/fnv" + "path/filepath" + + "github.com/golang/glog" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +const ( + // default directory to store pod sandbox checkpoint files + sandboxCheckpointDir = "sandbox" + protocolTCP = Protocol("tcp") + protocolUDP = Protocol("udp") + schemaVersion = "v1" +) + +var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.") + +type Protocol string + +// PortMapping is the port mapping configurations of a sandbox. +type PortMapping struct { + // Protocol of the port mapping. + Protocol *Protocol `json:"protocol,omitempty"` + // Port number within the container. + ContainerPort *int32 `json:"container_port,omitempty"` + // Port number on the host. + HostPort *int32 `json:"host_port,omitempty"` +} + +// CheckpointData contains all types of data that can be stored in the checkpoint. +type CheckpointData struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` +} + +// PodSandboxCheckpoint is the checkpoint structure for a sandbox +type PodSandboxCheckpoint struct { + // Version of the pod sandbox checkpoint schema. + Version string `json:"version"` + // Pod name of the sandbox. Same as the pod name in the PodSpec. + Name string `json:"name"` + // Pod namespace of the sandbox. Same as the pod namespace in the PodSpec. + Namespace string `json:"namespace"` + // Data to checkpoint for pod sandbox. + Data *CheckpointData `json:"data,omitempty"` + // Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero + CheckSum uint64 `json:"checksum"` +} + +// CheckpointHandler provides the interface to manage PodSandbox checkpoint +type CheckpointHandler interface { + // CreateCheckpoint persists sandbox checkpoint in CheckpointStore. + CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error + // GetCheckpoint retrieves sandbox checkpoint from CheckpointStore. + GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) + // RemoveCheckpoint removes sandbox checkpoint form CheckpointStore. + // WARNING: RemoveCheckpoint will not return error if checkpoint does not exist. + RemoveCheckpoint(podSandboxID string) error + // ListCheckpoint returns the list of existing checkpoints. + ListCheckpoints() ([]string, error) +} + +// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore +type PersistentCheckpointHandler struct { + store CheckpointStore +} + +func NewPersistentCheckpointHandler() CheckpointHandler { + return &PersistentCheckpointHandler{store: &FileStore{path: filepath.Join(dockershimRootDir, sandboxCheckpointDir)}} +} + +func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error { + checkpoint.CheckSum = calculateChecksum(*checkpoint) + blob, err := json.Marshal(checkpoint) + if err != nil { + return err + } + return handler.store.Write(podSandboxID, blob) +} + +func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) { + blob, err := handler.store.Read(podSandboxID) + if err != nil { + return nil, err + } + var checkpoint PodSandboxCheckpoint + //TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type. + err = json.Unmarshal(blob, &checkpoint) + if err != nil { + glog.Errorf("Failed to unmarshal checkpoint %q. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err) + return &checkpoint, CorruptCheckpointError + } + if checkpoint.CheckSum != calculateChecksum(checkpoint) { + glog.Errorf("Checksum of checkpoint %q is not valid", podSandboxID) + return &checkpoint, CorruptCheckpointError + } + return &checkpoint, nil +} + +func (handler *PersistentCheckpointHandler) RemoveCheckpoint(podSandboxID string) error { + return handler.store.Delete(podSandboxID) +} + +func (handler *PersistentCheckpointHandler) ListCheckpoints() ([]string, error) { + keys, err := handler.store.List() + if err != nil { + return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err) + } + return keys, nil +} + +func NewPodSandboxCheckpoint(namespace, name string) *PodSandboxCheckpoint { + return &PodSandboxCheckpoint{ + Version: schemaVersion, + Namespace: namespace, + Name: name, + Data: &CheckpointData{}, + } +} + +func calculateChecksum(checkpoint PodSandboxCheckpoint) uint64 { + checkpoint.CheckSum = 0 + hash := fnv.New32a() + hashutil.DeepHashObject(hash, checkpoint) + return uint64(hash.Sum32()) +} diff --git a/pkg/kubelet/dockershim/docker_checkpoint_test.go b/pkg/kubelet/dockershim/docker_checkpoint_test.go new file mode 100644 index 00000000000..477c9204d90 --- /dev/null +++ b/pkg/kubelet/dockershim/docker_checkpoint_test.go @@ -0,0 +1,97 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" + utilstore "k8s.io/kubernetes/pkg/kubelet/dockershim/testing" +) + +func NewTestPersistentCheckpointHandler() CheckpointHandler { + return &PersistentCheckpointHandler{store: utilstore.NewMemStore()} +} + +func TestPersistentCheckpointHandler(t *testing.T) { + var err error + handler := NewTestPersistentCheckpointHandler() + port80 := int32(80) + port443 := int32(443) + proto := protocolTCP + + checkpoint1 := NewPodSandboxCheckpoint("ns1", "sandbox1") + checkpoint1.Data.PortMappings = []*PortMapping{ + { + &proto, + &port80, + &port80, + }, + { + &proto, + &port443, + &port443, + }, + } + + checkpoints := []struct { + podSandboxID string + checkpoint *PodSandboxCheckpoint + }{ + { + "id1", + checkpoint1, + }, + { + "id2", + NewPodSandboxCheckpoint("ns2", "sandbox2"), + }, + } + + for _, tc := range checkpoints { + // Test CreateCheckpoints + err = handler.CreateCheckpoint(tc.podSandboxID, tc.checkpoint) + assert.NoError(t, err) + + // Test GetCheckpoints + checkpoint, err := handler.GetCheckpoint(tc.podSandboxID) + assert.NoError(t, err) + assert.Equal(t, *checkpoint, *tc.checkpoint) + } + // Test ListCheckpoints + keys, err := handler.ListCheckpoints() + assert.NoError(t, err) + sort.Strings(keys) + assert.Equal(t, keys, []string{"id1", "id2"}) + + // Test RemoveCheckpoints + err = handler.RemoveCheckpoint("id1") + assert.NoError(t, err) + // Test Remove Nonexisted Checkpoints + err = handler.RemoveCheckpoint("id1") + assert.NoError(t, err) + + // Test ListCheckpoints + keys, err = handler.ListCheckpoints() + assert.NoError(t, err) + assert.Equal(t, keys, []string{"id2"}) + + // Test Get NonExisted Checkpoint + _, err = handler.GetCheckpoint("id1") + assert.Error(t, err) +} diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index def3268883f..535ed569731 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -24,6 +24,7 @@ import ( dockerfilters "github.com/docker/engine-api/types/filters" "github.com/golang/glog" + utilerrors "k8s.io/apimachinery/pkg/util/errors" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" @@ -77,7 +78,12 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str return "", fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } - // Step 3: Start the sandbox container. + // Step 3: Create Sandbox Checkpoint. + if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { + return createResp.ID, err + } + + // Step 4: Start the sandbox container. // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) @@ -88,7 +94,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str return createResp.ID, nil } - // Step 4: Setup networking for the sandbox. + // Step 5: Setup networking for the sandbox. // All pod networking is setup by a CNI plugin discovered at startup time. // This plugin assigns the pod ip, sets up routes inside the sandbox, // creates interfaces etc. In theory, its jurisdiction ends with pod @@ -107,30 +113,61 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str // better to cut our losses assuming an out of band GC routine will cleanup // after us? func (ds *dockerService) StopPodSandbox(podSandboxID string) error { - status, err := ds.PodSandboxStatus(podSandboxID) - if err != nil { - return fmt.Errorf("Failed to get sandbox status: %v", err) - } - if nsOpts := status.GetLinux().GetNamespaces().GetOptions(); nsOpts != nil && !nsOpts.HostNetwork { + var namespace, name string + needNetworkTearDown := false + + status, statusErr := ds.PodSandboxStatus(podSandboxID) + if statusErr == nil { + nsOpts := status.GetLinux().GetNamespaces().GetOptions() + needNetworkTearDown = nsOpts != nil && !nsOpts.HostNetwork m := status.GetMetadata() + namespace = m.Namespace + name = m.Name + } else { + checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) + if err != nil { + glog.Errorf("Failed to get checkpoint for sandbox %q: %v", podSandboxID, err) + return fmt.Errorf("failed to get sandbox status: %v", statusErr) + } + namespace = checkpoint.Namespace + name = checkpoint.Name + // Always trigger network plugin to tear down + needNetworkTearDown = true + } + + if needNetworkTearDown { cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID) - if err := ds.networkPlugin.TearDownPod(m.Namespace, m.Name, cID); err != nil { + if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil { // TODO: Figure out a way to retry this error. We can't // right now because the plugin throws errors when it doesn't find // eth0, which might not exist for various reasons (setup failed, // conf changed etc). In theory, it should teardown everything else // so there's no need to retry. - glog.Errorf("Failed to teardown sandbox %v for pod %v/%v: %v", m.Namespace, m.Name, podSandboxID, err) + glog.Errorf("Failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err) } } - return ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod) + if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil { + glog.Errorf("Failed to stop sandbox %q: %v", podSandboxID, err) + // Do not return error if the container does not exist + if !dockertools.IsContainerNotFoundError(err) { + return err + } + } + return nil // TODO: Stop all running containers in the sandbox. } // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { - return ds.client.RemoveContainer(podSandboxID, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) + var errs []error + if err := ds.client.RemoveContainer(podSandboxID, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}); err != nil && !dockertools.IsContainerNotFoundError(err) { + errs = append(errs, err) + } + if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { + errs = append(errs, err) + } + return utilerrors.NewAggregate(errs) // TODO: remove all containers in the sandbox. } @@ -275,9 +312,11 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([] // Convert docker containers to runtime api sandboxes. result := []*runtimeapi.PodSandbox{} + // using map as set + sandboxIDs := make(map[string]bool) for i := range containers { c := containers[i] - converted, err := toRuntimeAPISandbox(&c) + converted, err := containerToRuntimeAPISandbox(&c) if err != nil { glog.V(4).Infof("Unable to convert docker to runtime API sandbox: %v", err) continue @@ -285,9 +324,35 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([] if filterOutReadySandboxes && converted.State == runtimeapi.PodSandboxState_SANDBOX_READY { continue } - + sandboxIDs[converted.Id] = true result = append(result, converted) } + + // Include sandbox that could only be found with its checkpoint if no filter is applied + // These PodSandbox will only include PodSandboxID, Name, Namespace. + // These PodSandbox will be in PodSandboxState_SANDBOX_NOTREADY state. + if filter == nil { + checkpoints, err := ds.checkpointHandler.ListCheckpoints() + if err != nil { + glog.Errorf("Failed to list checkpoints: %v", err) + } + for _, id := range checkpoints { + if _, ok := sandboxIDs[id]; ok { + continue + } + checkpoint, err := ds.checkpointHandler.GetCheckpoint(id) + if err != nil { + glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err) + + if err == CorruptCheckpointError { + glog.V(2).Info("Removing corrupted checkpoint %q: %+v", id, *checkpoint) + ds.checkpointHandler.RemoveCheckpoint(id) + } + continue + } + result = append(result, checkpointToRuntimeAPISandbox(id, checkpoint)) + } + } return result, nil } @@ -383,3 +448,27 @@ func setSandboxResources(hc *dockercontainer.HostConfig) { // TODO: Get rid of the dependency on kubelet internal package. hc.OomScoreAdj = qos.PodInfraOOMAdj } + +func constructPodSandboxCheckpoint(config *runtimeapi.PodSandboxConfig) *PodSandboxCheckpoint { + checkpoint := NewPodSandboxCheckpoint(config.Metadata.Namespace, config.Metadata.Name) + for _, pm := range config.GetPortMappings() { + proto := toCheckpointProtocol(pm.Protocol) + checkpoint.Data.PortMappings = append(checkpoint.Data.PortMappings, &PortMapping{ + HostPort: &pm.HostPort, + ContainerPort: &pm.ContainerPort, + Protocol: &proto, + }) + } + return checkpoint +} + +func toCheckpointProtocol(protocol runtimeapi.Protocol) Protocol { + switch protocol { + case runtimeapi.Protocol_TCP: + return protocolTCP + case runtimeapi.Protocol_UDP: + return protocolUDP + } + glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol) + return protocolTCP +} diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index a75b883a7cd..5a3650a5391 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -48,6 +48,9 @@ const ( defaultSeccompProfile = "unconfined" + // dockershimRootDir is the root directory for dockershim + dockershimRootDir = "/var/lib/dockershim" + // Internal docker labels used to identify whether a container is a sandbox // or a regular container. // TODO: This is not backward compatible with older containers. We will @@ -112,7 +115,8 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str client: client, execHandler: execHandler, }, - containerManager: cm.NewContainerManager(cgroupsName, client), + containerManager: cm.NewContainerManager(cgroupsName, client), + checkpointHandler: NewPersistentCheckpointHandler(), } if streamingConfig != nil { var err error @@ -173,7 +177,8 @@ type dockerService struct { networkPlugin network.NetworkPlugin containerManager cm.ContainerManager // cgroup driver used by Docker runtime. - cgroupDriver string + cgroupDriver string + checkpointHandler CheckpointHandler } // Version returns the runtime name, runtime version and runtime API version diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index fa6401b57f4..8255a5a16eb 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -41,7 +41,7 @@ func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin { func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := dockertools.NewFakeDockerClient().WithClock(fakeClock) - return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}}, c, fakeClock + return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}, checkpointHandler: NewTestPersistentCheckpointHandler()}, c, fakeClock } // TestStatus tests the runtime status logic. diff --git a/pkg/kubelet/dockershim/testing/BUILD b/pkg/kubelet/dockershim/testing/BUILD new file mode 100644 index 00000000000..5eb8bcd4466 --- /dev/null +++ b/pkg/kubelet/dockershim/testing/BUILD @@ -0,0 +1,27 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["util.go"], + tags = ["automanaged"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/dockershim/testing/util.go b/pkg/kubelet/dockershim/testing/util.go new file mode 100644 index 00000000000..cf870a99e7a --- /dev/null +++ b/pkg/kubelet/dockershim/testing/util.go @@ -0,0 +1,66 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "sync" +) + +// MemStore is an implementation of CheckpointStore interface which stores checkpoint in memory. +type MemStore struct { + mem map[string][]byte + sync.Mutex +} + +func NewMemStore() *MemStore { + return &MemStore{mem: make(map[string][]byte)} +} + +func (mstore *MemStore) Write(key string, data []byte) error { + mstore.Lock() + defer mstore.Unlock() + mstore.mem[key] = data + return nil +} + +func (mstore *MemStore) Read(key string) ([]byte, error) { + mstore.Lock() + defer mstore.Unlock() + data, ok := mstore.mem[key] + if !ok { + return nil, fmt.Errorf("checkpoint %q could not be found", key) + } + return data, nil +} + +func (mstore *MemStore) Delete(key string) error { + mstore.Lock() + defer mstore.Unlock() + delete(mstore.mem, key) + return nil +} + +func (mstore *MemStore) List() ([]string, error) { + mstore.Lock() + defer mstore.Unlock() + keys := make([]string, 0) + for key := range mstore.mem { + keys = append(keys, key) + } + return keys, nil +} diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 97a8752b1dd..5cab15784b8 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -727,7 +727,7 @@ func podIsExited(p *kubecontainer.Pod) bool { func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { - return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) + glog.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } return &libcni.RuntimeConf{