From 344d2f591f114a59c950562c01d06aa47b064c52 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 4 Jan 2017 17:36:17 -0800 Subject: [PATCH] add checkpoint structures for dockershim --- pkg/kubelet/dockershim/checkpoint_store.go | 114 +++++++++++++ .../dockershim/checkpoint_store_test.go | 161 ++++++++++++++++++ pkg/kubelet/dockershim/docker_checkpoint.go | 144 ++++++++++++++++ .../dockershim/docker_checkpoint_test.go | 98 +++++++++++ pkg/kubelet/dockershim/testing/util.go | 66 +++++++ 5 files changed, 583 insertions(+) create mode 100644 pkg/kubelet/dockershim/checkpoint_store.go create mode 100644 pkg/kubelet/dockershim/checkpoint_store_test.go create mode 100644 pkg/kubelet/dockershim/docker_checkpoint.go create mode 100644 pkg/kubelet/dockershim/docker_checkpoint_test.go create mode 100644 pkg/kubelet/dockershim/testing/util.go diff --git a/pkg/kubelet/dockershim/checkpoint_store.go b/pkg/kubelet/dockershim/checkpoint_store.go new file mode 100644 index 00000000000..642bd1889c4 --- /dev/null +++ b/pkg/kubelet/dockershim/checkpoint_store.go @@ -0,0 +1,114 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "strings" +) + +const ( + tmpPrefix = "." + tmpSuffix = ".tmp" + keyMaxLength = 250 +) + +var keyRegex = regexp.MustCompile("^[a-zA-Z0-9]+$") + +// CheckpointStore provides the interface for checkpoint storage backend. +// CheckpointStore must be thread-safe +type CheckpointStore interface { + // key must contain one or more characters in [A-Za-z0-9] + // Write persists a checkpoint with key + Write(key string, data []byte) error + // Read retrieves a checkpoint with key + Read(key string) ([]byte, error) + // Delete deletes a checkpoint with key + // Delete must not return error if checkpoint does not exist + Delete(key string) error + // List lists all keys of existing checkpoints + List() ([]string, error) +} + +// FileStore is an implementation of CheckpointStore interface which stores checkpoint in file. +type FileStore struct { + // path to the base directory for storing checkpoints + path string +} + +func (fstore *FileStore) Write(key string, data []byte) error { + if err := validateKey(key); err != nil { + return err + } + if _, err := os.Stat(fstore.path); err != nil { + // if directory already exists, proceed + if err = os.MkdirAll(fstore.path, 0755); err != nil && !os.IsExist(err) { + return err + } + } + tmpfile := filepath.Join(fstore.path, fmt.Sprintf("%s%s%s", tmpPrefix, key, tmpSuffix)) + if err := ioutil.WriteFile(tmpfile, data, 0644); err != nil { + return err + } + return os.Rename(tmpfile, fstore.getCheckpointPath(key)) +} + +func (fstore *FileStore) Read(key string) ([]byte, error) { + if err := validateKey(key); err != nil { + return nil, err + } + return ioutil.ReadFile(fstore.getCheckpointPath(key)) +} + +func (fstore *FileStore) Delete(key string) error { + if err := validateKey(key); err != nil { + return err + } + if err := os.Remove(fstore.getCheckpointPath(key)); err != nil && !os.IsNotExist(err) { + return err + } + return nil +} + +func (fstore *FileStore) List() ([]string, error) { + keys := make([]string, 0) + files, err := ioutil.ReadDir(fstore.path) + if err != nil { + return keys, err + } + for _, f := range files { + if !strings.HasSuffix(f.Name(), tmpSuffix) { + keys = append(keys, f.Name()) + } + } + return keys, nil +} + +func (fstore *FileStore) getCheckpointPath(key string) string { + return filepath.Join(fstore.path, key) +} + +func validateKey(key string) error { + if len(key) <= keyMaxLength && keyRegex.MatchString(key) { + return nil + } + return fmt.Errorf("checkpoint key %q is not valid.", key) +} diff --git a/pkg/kubelet/dockershim/checkpoint_store_test.go b/pkg/kubelet/dockershim/checkpoint_store_test.go new file mode 100644 index 00000000000..188cad82ef7 --- /dev/null +++ b/pkg/kubelet/dockershim/checkpoint_store_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "io/ioutil" + "os" + "sort" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + testPath = "/tmp/testFileStore" +) + +func TestFileStore(t *testing.T) { + path, err := ioutil.TempDir("", "FileStore") + assert.NoError(t, err) + defer cleanUpTestPath(t, path) + store := &FileStore{path: path} + + Checkpoints := []struct { + key string + data string + expectErr bool + }{ + { + "id1", + "data1", + false, + }, + { + "id2", + "data2", + false, + }, + { + "/id1", + "data1", + true, + }, + { + ".id1", + "data1", + true, + }, + { + " ", + "data2", + true, + }, + { + "___", + "data2", + true, + }, + } + + // Test Add Checkpoint + for _, c := range Checkpoints { + _, err = store.Read(c.key) + assert.Error(t, err) + + err = store.Write(c.key, []byte(c.data)) + if c.expectErr { + assert.Error(t, err) + continue + } else { + assert.NoError(t, err) + } + + // Test Read Checkpoint + data, err := store.Read(c.key) + assert.NoError(t, err) + assert.Equal(t, string(data), c.data) + } + + // Test list checkpoints. + keys, err := store.List() + assert.NoError(t, err) + sort.Strings(keys) + assert.Equal(t, keys, []string{"id1", "id2"}) + + // Test Delete Checkpoint + for _, c := range Checkpoints { + if c.expectErr { + continue + } + + err = store.Delete(c.key) + assert.NoError(t, err) + _, err = store.Read(c.key) + assert.Error(t, err) + } + + // Test delete non existed checkpoint + err = store.Delete("id1") + assert.NoError(t, err) + + // Test list checkpoints. + keys, err = store.List() + assert.NoError(t, err) + assert.Equal(t, len(keys), 0) +} + +func TestIsValidKey(t *testing.T) { + testcases := []struct { + key string + valid bool + }{ + { + " ", + false, + }, + { + "/foo/bar", + false, + }, + { + ".foo", + false, + }, + { + "a78768279290d33d0b82eaea43cb8346f500057cb5bd250e88c97a5585385d66", + true, + }, + } + + for _, tc := range testcases { + if tc.valid { + assert.NoError(t, validateKey(tc.key)) + } else { + assert.Error(t, validateKey(tc.key)) + } + } +} + +func cleanUpTestPath(t *testing.T, path string) { + if _, err := os.Stat(path); !os.IsNotExist(err) { + if err := os.RemoveAll(path); err != nil { + t.Errorf("Failed to delete test directory: %v", err) + } + } + return +} diff --git a/pkg/kubelet/dockershim/docker_checkpoint.go b/pkg/kubelet/dockershim/docker_checkpoint.go new file mode 100644 index 00000000000..dfb536a414a --- /dev/null +++ b/pkg/kubelet/dockershim/docker_checkpoint.go @@ -0,0 +1,144 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "encoding/json" + "fmt" + "github.com/golang/glog" + "hash/fnv" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +const ( + // default directory to store pod sandbox checkpoint files + sandboxCheckpointDir = "/var/lib/dockershim/sandbox" + protocolTCP = Protocol("tcp") + protocolUDP = Protocol("udp") + schemaVersion = "v1" +) + +var CorruptCheckpointError = fmt.Errorf("Checkpoint is corrupted.") + +type Protocol string + +// PortMapping is the port mapping configurations of a sandbox. +type PortMapping struct { + // Protocol of the port mapping. + Protocol *Protocol `json:"protocol,omitempty"` + // Port number within the container. + ContainerPort *int32 `json:"container_port,omitempty"` + // Port number on the host. + HostPort *int32 `json:"host_port,omitempty"` +} + +// CheckpointData contains all types of data that can be stored in the checkpoint. +type CheckpointData struct { + PortMappings []*PortMapping `json:"port_mappings,omitempty"` +} + +// PodSandboxCheckpoint is the checkpoint structure for a sandbox +type PodSandboxCheckpoint struct { + // Version of the pod sandbox checkpoint schema. + Version string `json:"version"` + // Pod name of the sandbox. Same as the pod name in the PodSpec. + Name string `json:"name"` + // Pod namespace of the sandbox. Same as the pod namespace in the PodSpec. + Namespace string `json:"namespace"` + // Data to checkpoint for pod sandbox. + Data *CheckpointData `json:"data,omitempty"` + // Checksum is calculated with fnv hash of the checkpoint object with checksum field set to be zero + CheckSum uint64 `json:"checksum"` +} + +// CheckpointHandler provides the interface to manage PodSandbox checkpoint +type CheckpointHandler interface { + // CreateCheckpoint persists sandbox checkpoint in CheckpointStore. + CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error + // GetCheckpoint retrieves sandbox checkpoint from CheckpointStore. + GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) + // RemoveCheckpoint removes sandbox checkpoint form CheckpointStore. + // WARNING: RemoveCheckpoint will not return error if checkpoint does not exist. + RemoveCheckpoint(podSandboxID string) error + // ListCheckpoint returns the list of existing checkpoints. + ListCheckpoints() ([]string, error) +} + +// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore +type PersistentCheckpointHandler struct { + store CheckpointStore +} + +func NewPersistentCheckpointHandler() CheckpointHandler { + return &PersistentCheckpointHandler{store: &FileStore{path: sandboxCheckpointDir}} +} + +func (handler *PersistentCheckpointHandler) CreateCheckpoint(podSandboxID string, checkpoint *PodSandboxCheckpoint) error { + checkpoint.CheckSum = calculateChecksum(*checkpoint) + blob, err := json.Marshal(checkpoint) + if err != nil { + return err + } + return handler.store.Write(podSandboxID, blob) +} + +func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (*PodSandboxCheckpoint, error) { + blob, err := handler.store.Read(podSandboxID) + if err != nil { + return nil, err + } + var checkpoint PodSandboxCheckpoint + //TODO: unmarhsal into a struct with just Version, check version, unmarshal into versioned type. + err = json.Unmarshal(blob, &checkpoint) + if err != nil { + glog.Errorf("Failed to unmarshal checkpoint %q: %v", podSandboxID, err) + return &checkpoint, CorruptCheckpointError + } + if checkpoint.CheckSum != calculateChecksum(checkpoint) { + glog.Errorf("Checksum of checkpoint %q is not valid", podSandboxID) + return &checkpoint, CorruptCheckpointError + } + return &checkpoint, nil +} + +func (handler *PersistentCheckpointHandler) RemoveCheckpoint(podSandboxID string) error { + return handler.store.Delete(podSandboxID) +} + +func (handler *PersistentCheckpointHandler) ListCheckpoints() ([]string, error) { + keys, err := handler.store.List() + if err != nil { + return []string{}, fmt.Errorf("failed to list checkpoint store: %v", err) + } + return keys, nil +} + +func NewPodSandboxCheckpoint(namespace, name string) *PodSandboxCheckpoint { + return &PodSandboxCheckpoint{ + Version: schemaVersion, + Namespace: namespace, + Name: name, + Data: &CheckpointData{}, + } +} + +func calculateChecksum(checkpoint PodSandboxCheckpoint) uint64 { + checkpoint.CheckSum = 0 + hash := fnv.New32a() + hashutil.DeepHashObject(hash, checkpoint) + return uint64(hash.Sum32()) +} diff --git a/pkg/kubelet/dockershim/docker_checkpoint_test.go b/pkg/kubelet/dockershim/docker_checkpoint_test.go new file mode 100644 index 00000000000..c5d428329d3 --- /dev/null +++ b/pkg/kubelet/dockershim/docker_checkpoint_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" + utilstore "k8s.io/kubernetes/pkg/kubelet/dockershim/testing" +) + +func NewTestPersistentCheckpointHandler() CheckpointHandler { + return &PersistentCheckpointHandler{store: utilstore.NewMemStore()} +} + +func TestPersistentCheckpointHandler(t *testing.T) { + var err error + handler := NewTestPersistentCheckpointHandler() + port80 := int32(80) + port443 := int32(443) + proto := protocolTCP + + checkpoint1 := NewPodSandboxCheckpoint("ns1", "sandbox1") + checkpoint1.Data.PortMappings = []*PortMapping{ + { + &proto, + &port80, + &port80, + }, + { + &proto, + &port443, + &port443, + }, + } + + checkpoints := []struct { + podSandboxID string + checkpoint *PodSandboxCheckpoint + }{ + { + "id1", + checkpoint1, + }, + { + "id2", + NewPodSandboxCheckpoint("ns2", "sandbox2"), + }, + } + + for _, tc := range checkpoints { + // Test CreateCheckpoints + err = handler.CreateCheckpoint(tc.podSandboxID, tc.checkpoint) + assert.NoError(t, err) + + // Test GetCheckpoints + checkpoint, err := handler.GetCheckpoint(tc.podSandboxID) + assert.NoError(t, err) + assert.Equal(t, *checkpoint, *tc.checkpoint) + } + + // Test ListCheckpoints + keys, err := handler.ListCheckpoints() + assert.NoError(t, err) + sort.Strings(keys) + assert.Equal(t, keys, []string{"id1", "id2"}) + + // Test RemoveCheckpoints + err = handler.RemoveCheckpoint("id1") + assert.NoError(t, err) + // Test Remove Nonexisted Checkpoints + err = handler.RemoveCheckpoint("id1") + assert.NoError(t, err) + + // Test ListCheckpoints + keys, err = handler.ListCheckpoints() + assert.NoError(t, err) + assert.Equal(t, keys, []string{"id2"}) + + // Test Get NonExisted Checkpoint + _, err = handler.GetCheckpoint("id1") + assert.Error(t, err) +} diff --git a/pkg/kubelet/dockershim/testing/util.go b/pkg/kubelet/dockershim/testing/util.go new file mode 100644 index 00000000000..cf870a99e7a --- /dev/null +++ b/pkg/kubelet/dockershim/testing/util.go @@ -0,0 +1,66 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "sync" +) + +// MemStore is an implementation of CheckpointStore interface which stores checkpoint in memory. +type MemStore struct { + mem map[string][]byte + sync.Mutex +} + +func NewMemStore() *MemStore { + return &MemStore{mem: make(map[string][]byte)} +} + +func (mstore *MemStore) Write(key string, data []byte) error { + mstore.Lock() + defer mstore.Unlock() + mstore.mem[key] = data + return nil +} + +func (mstore *MemStore) Read(key string) ([]byte, error) { + mstore.Lock() + defer mstore.Unlock() + data, ok := mstore.mem[key] + if !ok { + return nil, fmt.Errorf("checkpoint %q could not be found", key) + } + return data, nil +} + +func (mstore *MemStore) Delete(key string) error { + mstore.Lock() + defer mstore.Unlock() + delete(mstore.mem, key) + return nil +} + +func (mstore *MemStore) List() ([]string, error) { + mstore.Lock() + defer mstore.Unlock() + keys := make([]string, 0) + for key := range mstore.mem { + keys = append(keys, key) + } + return keys, nil +}