persist: demo code for persist api

Demonstrate how to make use of `virtcontainer/persist/api` data structure
package.

Signed-off-by: Wei Zhang <zhangwei555@huawei.com>
This commit is contained in:
Wei Zhang 2018-11-05 11:00:35 +08:00
parent e14ffb40cf
commit b42fde69c0
10 changed files with 544 additions and 11 deletions

View File

@ -224,6 +224,10 @@ func StartSandbox(ctx context.Context, sandboxID string) (VCSandbox, error) {
return nil, err return nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -256,6 +260,10 @@ func StopSandbox(ctx context.Context, sandboxID string) (VCSandbox, error) {
return nil, err return nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -394,6 +402,10 @@ func CreateContainer(ctx context.Context, sandboxID string, containerConfig Cont
return nil, nil, err return nil, nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, nil, err
}
return s, c, nil return s, c, nil
} }

View File

@ -374,6 +374,9 @@ func (c *Container) GetAnnotations() map[string]string {
// storeContainer stores a container config. // storeContainer stores a container config.
func (c *Container) storeContainer() error { func (c *Container) storeContainer() error {
if err := c.sandbox.newStore.Dump(); err != nil {
return err
}
return c.store.Store(store.Configuration, *(c.config)) return c.store.Store(store.Configuration, *(c.config))
} }

View File

@ -20,6 +20,7 @@ import (
"github.com/kata-containers/runtime/virtcontainers/device/config" "github.com/kata-containers/runtime/virtcontainers/device/config"
"github.com/kata-containers/runtime/virtcontainers/device/drivers" "github.com/kata-containers/runtime/virtcontainers/device/drivers"
"github.com/kata-containers/runtime/virtcontainers/device/manager" "github.com/kata-containers/runtime/virtcontainers/device/manager"
"github.com/kata-containers/runtime/virtcontainers/persist"
"github.com/kata-containers/runtime/virtcontainers/store" "github.com/kata-containers/runtime/virtcontainers/store"
"github.com/kata-containers/runtime/virtcontainers/types" "github.com/kata-containers/runtime/virtcontainers/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -247,6 +248,10 @@ func TestContainerAddDriveDir(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
container.store = containerStore container.store = containerStore
if sandbox.newStore, err = persist.GetDriver("fs"); err != nil || sandbox.newStore == nil {
t.Fatalf("failed to get fs persist driver")
}
// create state file // create state file
path := store.ContainerRuntimeRootPath(testSandboxID, container.ID()) path := store.ContainerRuntimeRootPath(testSandboxID, container.ID())
stateFilePath := filepath.Join(path, store.StateFile) stateFilePath := filepath.Join(path, store.StateFile)

96
virtcontainers/persist.go Normal file
View File

@ -0,0 +1,96 @@
// Copyright (c) 2018 Huawei Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
//"fmt"
//"github.com/sirupsen/logrus"
persistapi "github.com/kata-containers/runtime/virtcontainers/persist/api"
"github.com/kata-containers/runtime/virtcontainers/types"
)
func (s *Sandbox) dumpState(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error {
ss.SandboxContainer = s.id
ss.GuestMemoryBlockSizeMB = s.state.GuestMemoryBlockSizeMB
ss.State = string(s.state.State)
for id, cont := range s.containers {
cs[id] = persistapi.ContainerState{
State: string(cont.state.State),
Rootfs: persistapi.RootfsState{
BlockDeviceID: cont.state.BlockDeviceID,
FsType: cont.state.Fstype,
},
}
}
return nil
}
func (s *Sandbox) dumpHypervisor(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error {
ss.HypervisorState.BlockIndex = s.state.BlockIndex
return nil
}
// PersistVersion set persist data version to current version in runtime
func (s *Sandbox) persistVersion() {
s.newStore.RegisterHook("version", func(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error {
ss.PersistVersion = persistapi.CurPersistVersion
return nil
})
}
// PersistState register hook to set sandbox and container state to persist
func (s *Sandbox) persistState() {
s.newStore.RegisterHook("state", s.dumpState)
}
// PersistHvState register hook to save hypervisor state to persist data
func (s *Sandbox) persistHvState() {
s.newStore.RegisterHook("hypervisor", s.dumpHypervisor)
}
// Restore will restore data from persist disk on disk
func (s *Sandbox) Restore() error {
if err := s.newStore.Restore(s.id); err != nil {
return err
}
ss, _, err := s.newStore.GetStates()
if err != nil {
return err
}
/*
// TODO: need more modifications, restoring containers
// will make sandbox.addContainer failing
if s.containers == nil {
s.containers = make(map[string]*Container)
}
for id, cont := range cs {
s.containers[id] = &Container{
state: State{
State: stateString(cont.State),
BlockDeviceID: cont.Rootfs.BlockDeviceID,
Fstype: cont.Rootfs.FsType,
Pid: cont.ShimPid,
},
}
}
sbxCont, ok := s.containers[ss.SandboxContainer]
if !ok {
return fmt.Errorf("failed to get sandbox container state")
}
*/
s.state.GuestMemoryBlockSizeMB = ss.GuestMemoryBlockSizeMB
s.state.BlockIndex = ss.HypervisorState.BlockIndex
s.state.State = types.StateString(ss.State)
return nil
}

View File

@ -0,0 +1,16 @@
// Copyright (c) 2018 Huawei Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package persistapi
// PersistDriver is interface describing operations to save/restore persist data
type PersistDriver interface {
// Dump persist data to
Dump() error
Restore(sid string) error
Destroy() error
GetStates() (*SandboxState, map[string]ContainerState, error)
RegisterHook(name string, f SetFunc)
}

View File

@ -0,0 +1,270 @@
// Copyright (c) 2016 Intel Corporation
// Copyright (c) 2018 Huawei Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package fs
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"syscall"
persistapi "github.com/kata-containers/runtime/virtcontainers/persist/api"
)
// persistFile is the file name for JSON sandbox/container configuration
const persistFile = "persist.json"
// dirMode is the permission bits used for creating a directory
const dirMode = os.FileMode(0750)
// fileMode is the permission bits used for creating a file
const fileMode = os.FileMode(0640)
// storagePathSuffix is the suffix used for all storage paths
//
// Note: this very brief path represents "virtcontainers". It is as
// terse as possible to minimise path length.
const storagePathSuffix = "vc"
// sandboxPathSuffix is the suffix used for sandbox storage
const sandboxPathSuffix = "sbs"
// runStoragePath is the sandbox runtime directory.
// It will contain one state.json and one lock file for each created sandbox.
var runStoragePath = filepath.Join("/run", storagePathSuffix, sandboxPathSuffix)
// FS storage driver implementation
type FS struct {
sandboxState *persistapi.SandboxState
containerState map[string]persistapi.ContainerState
setFuncs map[string]persistapi.SetFunc
lockFile *os.File
}
// Name returns driver name
func Name() string {
return "fs"
}
// Init FS persist driver and return abstract PersistDriver
func Init() (persistapi.PersistDriver, error) {
return &FS{
sandboxState: &persistapi.SandboxState{},
containerState: make(map[string]persistapi.ContainerState),
setFuncs: make(map[string]persistapi.SetFunc),
}, nil
}
func (fs *FS) sandboxDir() (string, error) {
if fs.sandboxState.SandboxContainer == "" {
return "", fmt.Errorf("sandbox container id required")
}
return filepath.Join(runStoragePath, fs.sandboxState.SandboxContainer), nil
}
// Dump sandboxState and containerState to disk
func (fs *FS) Dump() (retErr error) {
// call registered hooks to set sandboxState and containerState
for _, fun := range fs.setFuncs {
fun(fs.sandboxState, fs.containerState)
}
// if error happened, destroy all dirs
defer func() {
if retErr != nil {
// TODO: log error
fs.Destroy()
}
}()
sandboxDir, err := fs.sandboxDir()
if err != nil {
return err
}
if err := os.MkdirAll(sandboxDir, dirMode); err != nil {
return err
}
if err := fs.lock(); err != nil {
return err
}
defer fs.unlock()
// persist sandbox configuration data
sandboxFile := filepath.Join(sandboxDir, persistFile)
f, err := os.OpenFile(sandboxFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileMode)
if err != nil {
return err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(fs.sandboxState); err != nil {
return err
}
// persist container configuration data
for cid, cstate := range fs.containerState {
cdir := filepath.Join(sandboxDir, cid)
if err := os.MkdirAll(cdir, dirMode); err != nil {
return err
}
cfile := filepath.Join(cdir, persistFile)
cf, err := os.OpenFile(cfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileMode)
if err != nil {
return err
}
if err := json.NewEncoder(cf).Encode(cstate); err != nil {
return err
}
cf.Close()
}
return nil
}
// Restore state for sandbox with name sid
func (fs *FS) Restore(sid string) error {
if sid == "" {
return fmt.Errorf("restore requires sandbox id")
}
fs.sandboxState.SandboxContainer = sid
sandboxDir, err := fs.sandboxDir()
if err != nil {
return err
}
if err := os.MkdirAll(sandboxDir, dirMode); err != nil {
return err
}
if err := fs.lock(); err != nil {
return err
}
defer fs.unlock()
// get sandbox configuration from persist data
sandboxFile := filepath.Join(sandboxDir, persistFile)
f, err := os.OpenFile(sandboxFile, os.O_RDONLY, fileMode)
if err != nil {
return err
}
defer f.Close()
if err := json.NewDecoder(f).Decode(fs.sandboxState); err != nil {
return err
}
// walk sandbox dir and find container
d, err := os.OpenFile(sandboxDir, os.O_RDONLY, fileMode)
if err != nil {
return err
}
defer d.Close()
files, err := d.Readdir(-1)
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
continue
}
cid := file.Name()
cfile := filepath.Join(sandboxDir, cid, persistFile)
cf, err := os.OpenFile(cfile, os.O_RDONLY, fileMode)
if err != nil {
// if persist.json doesn't exist, ignore and go to next
if os.IsNotExist(err) {
continue
}
return err
}
var cstate persistapi.ContainerState
if err := json.NewDecoder(cf).Decode(&cstate); err != nil {
return err
}
cf.Close()
fs.containerState[cid] = cstate
}
return nil
}
// Destroy removes everything from disk
func (fs *FS) Destroy() error {
sandboxDir, err := fs.sandboxDir()
if err != nil {
return err
}
if err := os.RemoveAll(sandboxDir); err != nil {
return err
}
return nil
}
// GetStates returns SandboxState and ContainerState
func (fs *FS) GetStates() (*persistapi.SandboxState, map[string]persistapi.ContainerState, error) {
return fs.sandboxState, fs.containerState, nil
}
// RegisterHook registers processing hooks for Dump
func (fs *FS) RegisterHook(name string, f persistapi.SetFunc) {
// only accept last registered hook with same name
fs.setFuncs[name] = f
}
func (fs *FS) lock() error {
sandboxDir, err := fs.sandboxDir()
if err != nil {
return err
}
f, err := os.Open(sandboxDir)
if err != nil {
return err
}
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
f.Close()
return err
}
fs.lockFile = f
return nil
}
func (fs *FS) unlock() error {
if fs.lockFile == nil {
return nil
}
lockFile := fs.lockFile
defer lockFile.Close()
fs.lockFile = nil
if err := syscall.Flock(int(lockFile.Fd()), syscall.LOCK_UN); err != nil {
return err
}
return nil
}
// TestSetRunStoragePath set runStoragePath to path
// this function is only used for testing purpose
func TestSetRunStoragePath(path string) {
runStoragePath = path
}

View File

@ -0,0 +1,31 @@
// Copyright (c) 2018 Huawei Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package persist
import (
"fmt"
"github.com/kata-containers/runtime/virtcontainers/persist/api"
"github.com/kata-containers/runtime/virtcontainers/persist/fs"
)
type initFunc (func() (persistapi.PersistDriver, error))
var (
supportedDrivers = map[string]initFunc{
"fs": fs.Init,
}
defaultDriver = "fs"
)
// GetDriver returns new PersistDriver according to driver name
func GetDriver(name string) (persistapi.PersistDriver, error) {
if f, ok := supportedDrivers[name]; ok {
return f()
}
return nil, fmt.Errorf("failed to get storage driver %q", name)
}

View File

@ -0,0 +1,2 @@
This package is a simple placeholder currently which will contain persist storage plugin support,
e.g. leveldb, sqlite and other possible storage implementations.

View File

@ -27,6 +27,8 @@ import (
"github.com/kata-containers/runtime/virtcontainers/device/drivers" "github.com/kata-containers/runtime/virtcontainers/device/drivers"
deviceManager "github.com/kata-containers/runtime/virtcontainers/device/manager" deviceManager "github.com/kata-containers/runtime/virtcontainers/device/manager"
exp "github.com/kata-containers/runtime/virtcontainers/experimental" exp "github.com/kata-containers/runtime/virtcontainers/experimental"
"github.com/kata-containers/runtime/virtcontainers/persist"
persistapi "github.com/kata-containers/runtime/virtcontainers/persist/api"
"github.com/kata-containers/runtime/virtcontainers/pkg/annotations" "github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
vcTypes "github.com/kata-containers/runtime/virtcontainers/pkg/types" vcTypes "github.com/kata-containers/runtime/virtcontainers/pkg/types"
"github.com/kata-containers/runtime/virtcontainers/store" "github.com/kata-containers/runtime/virtcontainers/store"
@ -159,8 +161,11 @@ type Sandbox struct {
hypervisor hypervisor hypervisor hypervisor
agent agent agent agent
store *store.VCStore store *store.VCStore
network Network // store is used to replace VCStore step by step
monitor *monitor newStore persistapi.PersistDriver
network Network
monitor *monitor
config *SandboxConfig config *SandboxConfig
@ -472,15 +477,25 @@ func createSandbox(ctx context.Context, sandboxConfig SandboxConfig, factory Fac
} }
s.devManager = deviceManager.NewDeviceManager(sandboxConfig.HypervisorConfig.BlockDeviceDriver, devices) s.devManager = deviceManager.NewDeviceManager(sandboxConfig.HypervisorConfig.BlockDeviceDriver, devices)
// register persist hook for now, data will be written to disk by Dump()
s.persistState()
s.persistHvState()
if err := s.Restore(); err == nil && s.state.State != "" {
return s, nil
}
// We first try to fetch the sandbox state from storage. // We first try to fetch the sandbox state from storage.
// If it exists, this means this is a re-creation, i.e. // If it exists, this means this is a re-creation, i.e.
// we don't need to talk to the guest's agent, but only // we don't need to talk to the guest's agent, but only
// want to create the sandbox and its containers in memory. // want to create the sandbox and its containers in memory.
state, err := s.store.LoadState() /* state, err := s.store.LoadState()
if err == nil && state.State != "" { if err == nil && state.State != "" {
s.state = state s.state = state
return s, nil return s, nil
} }*/
// if sandbox doesn't exist, set persist version to current version
s.persistVersion()
// Below code path is called only during create, because of earlier check. // Below code path is called only during create, because of earlier check.
if err := s.agent.createSandbox(s); err != nil { if err := s.agent.createSandbox(s); err != nil {
@ -536,6 +551,10 @@ func newSandbox(ctx context.Context, sandboxConfig SandboxConfig, factory Factor
s.store = vcStore s.store = vcStore
if s.newStore, err = persist.GetDriver("fs"); err != nil || s.newStore == nil {
return nil, fmt.Errorf("failed to get fs persist driver")
}
if err = globalSandboxList.addSandbox(s); err != nil { if err = globalSandboxList.addSandbox(s); err != nil {
return nil, err return nil, err
} }
@ -586,6 +605,10 @@ func (s *Sandbox) storeSandbox() error {
} }
} }
if err = s.newStore.Dump(); err != nil {
return err
}
return nil return nil
} }
@ -1078,6 +1101,10 @@ func (s *Sandbox) CreateContainer(contConfig ContainerConfig) (VCContainer, erro
return nil, err return nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, err
}
return c, nil return c, nil
} }
@ -1094,6 +1121,11 @@ func (s *Sandbox) StartContainer(containerID string) (VCContainer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, err
}
//Fixme Container delete from sandbox, need to update resources //Fixme Container delete from sandbox, need to update resources
return c, nil return c, nil
@ -1112,6 +1144,9 @@ func (s *Sandbox) StopContainer(containerID string) (VCContainer, error) {
return nil, err return nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, err
}
return c, nil return c, nil
} }
@ -1124,7 +1159,14 @@ func (s *Sandbox) KillContainer(containerID string, signal syscall.Signal, all b
} }
// Send a signal to the process. // Send a signal to the process.
return c.kill(signal, all) if err := c.kill(signal, all); err != nil {
return err
}
if err = s.storeSandbox(); err != nil {
return err
}
return nil
} }
// DeleteContainer deletes a container from the sandbox // DeleteContainer deletes a container from the sandbox
@ -1158,6 +1200,9 @@ func (s *Sandbox) DeleteContainer(containerID string) (VCContainer, error) {
return nil, err return nil, err
} }
if err = s.storeSandbox(); err != nil {
return nil, err
}
return c, nil return c, nil
} }
@ -1236,7 +1281,13 @@ func (s *Sandbox) UpdateContainer(containerID string, resources specs.LinuxResou
return err return err
} }
return c.storeContainer() if err := c.storeContainer(); err != nil {
return err
}
if err = s.storeSandbox(); err != nil {
return err
}
return nil
} }
// StatsContainer return the stats of a running container // StatsContainer return the stats of a running container
@ -1263,7 +1314,14 @@ func (s *Sandbox) PauseContainer(containerID string) error {
} }
// Pause the container. // Pause the container.
return c.pause() if err := c.pause(); err != nil {
return err
}
if err = s.storeSandbox(); err != nil {
return err
}
return nil
} }
// ResumeContainer resumes a paused container. // ResumeContainer resumes a paused container.
@ -1275,7 +1333,14 @@ func (s *Sandbox) ResumeContainer(containerID string) error {
} }
// Resume the container. // Resume the container.
return c.resume() if err := c.resume(); err != nil {
return err
}
if err = s.storeSandbox(); err != nil {
return err
}
return nil
} }
// createContainers registers all containers to the proxy, create the // createContainers registers all containers to the proxy, create the
@ -1306,6 +1371,9 @@ func (s *Sandbox) createContainers() error {
if err := s.updateCgroups(); err != nil { if err := s.updateCgroups(); err != nil {
return err return err
} }
if err := s.storeSandbox(); err != nil {
return err
}
return nil return nil
} }
@ -1327,6 +1395,10 @@ func (s *Sandbox) Start() error {
} }
} }
if err := s.storeSandbox(); err != nil {
return err
}
s.Logger().Info("Sandbox is started") s.Logger().Info("Sandbox is started")
return nil return nil
@ -1362,7 +1434,15 @@ func (s *Sandbox) Stop() error {
} }
// Remove the network. // Remove the network.
return s.removeNetwork() if err := s.removeNetwork(); err != nil {
return err
}
if err := s.storeSandbox(); err != nil {
return err
}
return nil
} }
// Pause pauses the sandbox // Pause pauses the sandbox
@ -1379,7 +1459,15 @@ func (s *Sandbox) Pause() error {
s.monitor.stop() s.monitor.stop()
} }
return s.pauseSetStates() if err := s.pauseSetStates(); err != nil {
return err
}
if err := s.storeSandbox(); err != nil {
return err
}
return nil
} }
// Resume resumes the sandbox // Resume resumes the sandbox
@ -1388,7 +1476,15 @@ func (s *Sandbox) Resume() error {
return err return err
} }
return s.resumeSetStates() if err := s.resumeSetStates(); err != nil {
return err
}
if err := s.storeSandbox(); err != nil {
return err
}
return nil
} }
// list lists all sandbox running on the host. // list lists all sandbox running on the host.

View File

@ -14,6 +14,7 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/kata-containers/runtime/virtcontainers/persist/fs"
"github.com/kata-containers/runtime/virtcontainers/store" "github.com/kata-containers/runtime/virtcontainers/store"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -104,6 +105,7 @@ func TestMain(m *testing.M) {
// allow the tests to run without affecting the host system. // allow the tests to run without affecting the host system.
store.ConfigStoragePath = filepath.Join(testDir, store.StoragePathSuffix, "config") store.ConfigStoragePath = filepath.Join(testDir, store.StoragePathSuffix, "config")
store.RunStoragePath = filepath.Join(testDir, store.StoragePathSuffix, "run") store.RunStoragePath = filepath.Join(testDir, store.StoragePathSuffix, "run")
fs.TestSetRunStoragePath(filepath.Join(testDir, "vc", "sbs"))
// set now that configStoragePath has been overridden. // set now that configStoragePath has been overridden.
sandboxDirConfig = filepath.Join(store.ConfigStoragePath, testSandboxID) sandboxDirConfig = filepath.Join(store.ConfigStoragePath, testSandboxID)