Merge pull request #54085 from yujuhong/checkpoint-pkg

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add a file store utility package in kubelet

More and more components checkpoints (i.e., persist their states) in
kubelet. Refurbish and move the implementation in dockershim to a
utility package to improve code reusability.
This commit is contained in:
Kubernetes Submit Queue 2017-11-02 13:50:16 -07:00 committed by GitHub
commit dc35709eee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 363 additions and 216 deletions

View File

@ -9,7 +9,6 @@ load(
go_library(
name = "go_default_library",
srcs = [
"checkpoint_store.go",
"convert.go",
"doc.go",
"docker_checkpoint.go",
@ -61,7 +60,9 @@ go_library(
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/cache:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/util/parsers:go_default_library",
"//vendor/github.com/armon/circbuf:go_default_library",
@ -84,7 +85,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"checkpoint_store_test.go",
"convert_test.go",
"docker_checkpoint_test.go",
"docker_container_test.go",
@ -111,7 +111,6 @@ go_test(
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/dockershim/errors:go_default_library",
"//pkg/kubelet/dockershim/libdocker:go_default_library",
"//pkg/kubelet/dockershim/testing:go_default_library",
"//pkg/kubelet/network:go_default_library",

View File

@ -1,156 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockershim
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strings"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
)
const (
tmpPrefix = "."
tmpSuffix = ".tmp"
keyMaxLength = 250
)
var keyRegex = regexp.MustCompile("^[a-zA-Z0-9]+$")
// CheckpointStore provides the interface for checkpoint storage backend.
// CheckpointStore must be thread-safe
type CheckpointStore interface {
// key must contain one or more characters in [A-Za-z0-9]
// Write persists a checkpoint with key
Write(key string, data []byte) error
// Read retrieves a checkpoint with key
// Read must return CheckpointNotFoundError if checkpoint is not found
Read(key string) ([]byte, error)
// Delete deletes a checkpoint with key
// Delete must not return error if checkpoint does not exist
Delete(key string) error
// List lists all keys of existing checkpoints
List() ([]string, error)
}
// FileStore is an implementation of CheckpointStore interface which stores checkpoint in files.
type FileStore struct {
// path to the base directory for storing checkpoint files
path string
}
func NewFileStore(path string) (CheckpointStore, error) {
if err := ensurePath(path); err != nil {
return nil, err
}
return &FileStore{path: path}, nil
}
// writeFileAndSync is copied from ioutil.WriteFile, with the extra File.Sync
// at the end to ensure file is written on the disk.
func writeFileAndSync(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err == nil {
// Only sync if the Write completed successfully.
err = f.Sync()
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}
func (fstore *FileStore) Write(key string, data []byte) error {
if err := validateKey(key); err != nil {
return err
}
if err := ensurePath(fstore.path); err != nil {
return err
}
tmpfile := filepath.Join(fstore.path, fmt.Sprintf("%s%s%s", tmpPrefix, key, tmpSuffix))
if err := writeFileAndSync(tmpfile, data, 0644); err != nil {
return err
}
return os.Rename(tmpfile, fstore.getCheckpointPath(key))
}
func (fstore *FileStore) Read(key string) ([]byte, error) {
if err := validateKey(key); err != nil {
return nil, err
}
bytes, err := ioutil.ReadFile(fstore.getCheckpointPath(key))
if os.IsNotExist(err) {
return bytes, errors.CheckpointNotFoundError
}
return bytes, err
}
func (fstore *FileStore) Delete(key string) error {
if err := validateKey(key); err != nil {
return err
}
if err := os.Remove(fstore.getCheckpointPath(key)); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func (fstore *FileStore) List() ([]string, error) {
keys := make([]string, 0)
files, err := ioutil.ReadDir(fstore.path)
if err != nil {
return keys, err
}
for _, f := range files {
if !strings.HasPrefix(f.Name(), tmpPrefix) {
keys = append(keys, f.Name())
}
}
return keys, nil
}
func (fstore *FileStore) getCheckpointPath(key string) string {
return filepath.Join(fstore.path, key)
}
// ensurePath creates input directory if it does not exist
func ensurePath(path string) error {
if _, err := os.Stat(path); err != nil {
// MkdirAll returns nil if directory already exists
return os.MkdirAll(path, 0755)
}
return nil
}
func validateKey(key string) error {
if len(key) <= keyMaxLength && keyRegex.MatchString(key) {
return nil
}
return fmt.Errorf("checkpoint key %q is not valid.", key)
}

View File

@ -24,6 +24,8 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
@ -82,11 +84,11 @@ type CheckpointHandler interface {
// PersistentCheckpointHandler is an implementation of CheckpointHandler. It persists checkpoint in CheckpointStore
type PersistentCheckpointHandler struct {
store CheckpointStore
store utilstore.Store
}
func NewPersistentCheckpointHandler(dockershimRootDir string) (CheckpointHandler, error) {
fstore, err := NewFileStore(filepath.Join(dockershimRootDir, sandboxCheckpointDir))
fstore, err := utilstore.NewFileStore(filepath.Join(dockershimRootDir, sandboxCheckpointDir), utilfs.DefaultFs{})
if err != nil {
return nil, err
}

View File

@ -64,6 +64,7 @@ filegroup(
"//pkg/kubelet/util/ioutils:all-srcs",
"//pkg/kubelet/util/queue:all-srcs",
"//pkg/kubelet/util/sliceutils:all-srcs",
"//pkg/kubelet/util/store:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,41 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"filestore.go",
"store.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/store",
visibility = ["//visibility:public"],
deps = ["//pkg/util/filesystem:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = [
"filestore_test.go",
"store_test.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/store",
library = ":go_default_library",
deps = [
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package store hosts a Store interface and its implementations.
package store // import "k8s.io/kubernetes/pkg/kubelet/util/store"

View File

@ -0,0 +1,157 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"os"
"path/filepath"
"strings"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
// Name prefix for the temporary files.
tmpPrefix = "."
)
// FileStore is an implementation of the Store interface which stores data in files.
type FileStore struct {
// Absolute path to the base directory for storing data files.
directoryPath string
// filesystem to use.
filesystem utilfs.Filesystem
}
// NewFileStore returns an instance of FileStore.
func NewFileStore(path string, fs utilfs.Filesystem) (Store, error) {
if err := ensureDirectory(fs, path); err != nil {
return nil, err
}
return &FileStore{directoryPath: path, filesystem: fs}, nil
}
// Write writes the given data to a file named key.
func (f *FileStore) Write(key string, data []byte) error {
if err := ValidateKey(key); err != nil {
return err
}
if err := ensureDirectory(f.filesystem, f.directoryPath); err != nil {
return err
}
return writeFile(f.filesystem, f.getPathByKey(key), data)
}
// Read reads the data from the file named key.
func (f *FileStore) Read(key string) ([]byte, error) {
if err := ValidateKey(key); err != nil {
return nil, err
}
bytes, err := f.filesystem.ReadFile(f.getPathByKey(key))
if os.IsNotExist(err) {
return bytes, ErrKeyNotFound
}
return bytes, err
}
// Delete deletes the key file.
func (f *FileStore) Delete(key string) error {
if err := ValidateKey(key); err != nil {
return err
}
return removePath(f.filesystem, f.getPathByKey(key))
}
// List returns all keys in the store.
func (f *FileStore) List() ([]string, error) {
keys := make([]string, 0)
files, err := f.filesystem.ReadDir(f.directoryPath)
if err != nil {
return keys, err
}
for _, f := range files {
if !strings.HasPrefix(f.Name(), tmpPrefix) {
keys = append(keys, f.Name())
}
}
return keys, nil
}
// getPathByKey returns the full path of the file for the key.
func (f *FileStore) getPathByKey(key string) string {
return filepath.Join(f.directoryPath, key)
}
// ensureDirectory creates the directory if it does not exist.
func ensureDirectory(fs utilfs.Filesystem, path string) error {
if _, err := fs.Stat(path); err != nil {
// MkdirAll returns nil if directory already exists.
return fs.MkdirAll(path, 0755)
}
return nil
}
// writeFile writes data to path in a single transaction.
func writeFile(fs utilfs.Filesystem, path string, data []byte) (retErr error) {
// Create a temporary file in the base directory of `path` with a prefix.
tmpFile, err := fs.TempFile(filepath.Dir(path), tmpPrefix)
if err != nil {
return err
}
tmpPath := tmpFile.Name()
defer func() {
// Close the file.
if err := tmpFile.Close(); err != nil {
if retErr == nil {
retErr = err
} else {
retErr = fmt.Errorf("failed to close temp file after error %v; close error: %v", retErr, err)
}
}
// Clean up the temp file on error.
if retErr != nil && tmpPath != "" {
if err := removePath(fs, tmpPath); err != nil {
retErr = fmt.Errorf("failed to remove the temporary file (%q) after error %v; remove error: %v", tmpPath, retErr, err)
}
}
}()
// Write data.
if _, err := tmpFile.Write(data); err != nil {
return err
}
// Sync file.
if err := tmpFile.Sync(); err != nil {
return err
}
return fs.Rename(tmpPath, path)
}
func removePath(fs utilfs.Filesystem, path string) error {
if err := fs.Remove(path); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}

View File

@ -14,26 +14,24 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package dockershim
package store
import (
"io/ioutil"
"os"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
"k8s.io/kubernetes/pkg/util/filesystem"
)
func TestFileStore(t *testing.T) {
path, err := ioutil.TempDir("", "FileStore")
assert.NoError(t, err)
defer cleanUpTestPath(t, path)
store, err := NewFileStore(path)
store, err := NewFileStore(path, filesystem.NewFakeFs())
assert.NoError(t, err)
Checkpoints := []struct {
testCases := []struct {
key string
data string
expectErr bool
@ -70,8 +68,8 @@ func TestFileStore(t *testing.T) {
},
}
// Test Add Checkpoint
for _, c := range Checkpoints {
// Test add data.
for _, c := range testCases {
_, err = store.Read(c.key)
assert.Error(t, err)
@ -83,20 +81,20 @@ func TestFileStore(t *testing.T) {
assert.NoError(t, err)
}
// Test Read Checkpoint
// Test read data by key.
data, err := store.Read(c.key)
assert.NoError(t, err)
assert.Equal(t, string(data), c.data)
}
// Test list checkpoints.
// Test list keys.
keys, err := store.List()
assert.NoError(t, err)
sort.Strings(keys)
assert.Equal(t, keys, []string{"id1", "id2"})
// Test Delete Checkpoint
for _, c := range Checkpoints {
// Test Delete data
for _, c := range testCases {
if c.expectErr {
continue
}
@ -104,55 +102,15 @@ func TestFileStore(t *testing.T) {
err = store.Delete(c.key)
assert.NoError(t, err)
_, err = store.Read(c.key)
assert.EqualValues(t, errors.CheckpointNotFoundError, err)
assert.EqualValues(t, ErrKeyNotFound, err)
}
// Test delete non existed checkpoint
// Test delete non-existent key.
err = store.Delete("id1")
assert.NoError(t, err)
// Test list checkpoints.
// Test list keys.
keys, err = store.List()
assert.NoError(t, err)
assert.Equal(t, len(keys), 0)
}
func TestIsValidKey(t *testing.T) {
testcases := []struct {
key string
valid bool
}{
{
" ",
false,
},
{
"/foo/bar",
false,
},
{
".foo",
false,
},
{
"a78768279290d33d0b82eaea43cb8346f500057cb5bd250e88c97a5585385d66",
true,
},
}
for _, tc := range testcases {
if tc.valid {
assert.NoError(t, validateKey(tc.key))
} else {
assert.Error(t, validateKey(tc.key))
}
}
}
func cleanUpTestPath(t *testing.T, path string) {
if _, err := os.Stat(path); !os.IsNotExist(err) {
if err := os.RemoveAll(path); err != nil {
assert.NoError(t, err, "Failed to delete test directory: %v", err)
}
}
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"regexp"
)
const (
keyMaxLength = 250
keyCharFmt string = "[A-Za-z0-9]"
keyExtCharFmt string = "[-A-Za-z0-9_.]"
qualifiedKeyFmt string = "(" + keyCharFmt + keyExtCharFmt + "*)?" + keyCharFmt
)
var (
// Key must consist of alphanumeric characters, '-', '_' or '.', and must start
// and end with an alphanumeric character.
keyRegex = regexp.MustCompile("^" + qualifiedKeyFmt + "$")
// ErrKeyNotFound is the error returned if key is not found in Store.
ErrKeyNotFound = fmt.Errorf("key is not found")
)
// Store provides the interface for storing keyed data.
// Store must be thread-safe
type Store interface {
// key must contain one or more characters in [A-Za-z0-9]
// Write writes data with key.
Write(key string, data []byte) error
// Read retrieves data with key
// Read must return ErrKeyNotFound if key is not found.
Read(key string) ([]byte, error)
// Delete deletes data by key
// Delete must not return error if key does not exist
Delete(key string) error
// List lists all existing keys.
List() ([]string, error)
}
// ValidateKey returns an error if the given key does not meet the requirement
// of the key format and length.
func ValidateKey(key string) error {
if len(key) <= keyMaxLength && keyRegex.MatchString(key) {
return nil
}
return fmt.Errorf("invalid key: %q", key)
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIsValidKey(t *testing.T) {
testcases := []struct {
key string
valid bool
}{
{
" ",
false,
},
{
"/foo/bar",
false,
},
{
".foo",
false,
},
{
"a78768279290d33d0b82eaea43cb8346f500057cb5bd250e88c97a5585385d66",
true,
},
{
"a7.87-6_8",
true,
},
{
"a7.87-677-",
false,
},
}
for _, tc := range testcases {
if tc.valid {
assert.NoError(t, ValidateKey(tc.key))
} else {
assert.Error(t, ValidateKey(tc.key))
}
}
}