persist: simplify persist api

Fixes #803

Simplify new store API to make the code easier to understand and use.

Signed-off-by: Wei Zhang <zhangwei555@huawei.com>
This commit is contained in:
Wei Zhang 2019-04-22 19:08:33 +08:00
parent 437b3cb2f7
commit 341a988e06
8 changed files with 74 additions and 126 deletions

View File

@ -392,7 +392,7 @@ func (c *Container) GetAnnotations() map[string]string {
// storeContainer stores a container config. // storeContainer stores a container config.
func (c *Container) storeContainer() error { func (c *Container) storeContainer() error {
if c.sandbox.supportNewStore() { if c.sandbox.supportNewStore() {
if err := c.sandbox.newStore.ToDisk(); err != nil { if err := c.sandbox.Save(); err != nil {
return err return err
} }
} }
@ -447,7 +447,7 @@ func (c *Container) setContainerState(state types.StateString) error {
if c.sandbox.supportNewStore() { if c.sandbox.supportNewStore() {
// flush data to storage // flush data to storage
if err := c.sandbox.newStore.ToDisk(); err != nil { if err := c.sandbox.Save(); err != nil {
return err return err
} }
} else { } else {

View File

@ -19,7 +19,16 @@ var (
errContainerPersistNotExist = errors.New("container doesn't exist in persist data") errContainerPersistNotExist = errors.New("container doesn't exist in persist data")
) )
func (s *Sandbox) dumpState(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error { func (s *Sandbox) dumpVersion(ss *persistapi.SandboxState) {
// New created sandbox has a uninitialized `PersistVersion` which should be set to current version when do the first saving;
// Old restored sandbox should keep its original version and shouldn't be modified any more after it's initialized.
ss.PersistVersion = s.state.PersistVersion
if ss.PersistVersion == 0 {
ss.PersistVersion = persistapi.CurPersistVersion
}
}
func (s *Sandbox) dumpState(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) {
ss.SandboxContainer = s.id ss.SandboxContainer = s.id
ss.GuestMemoryBlockSizeMB = s.state.GuestMemoryBlockSizeMB ss.GuestMemoryBlockSizeMB = s.state.GuestMemoryBlockSizeMB
ss.State = string(s.state.State) ss.State = string(s.state.State)
@ -45,13 +54,10 @@ func (s *Sandbox) dumpState(ss *persistapi.SandboxState, cs map[string]persistap
delete(cs, id) delete(cs, id)
} }
} }
return nil
} }
func (s *Sandbox) dumpHypervisor(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error { func (s *Sandbox) dumpHypervisor(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) {
ss.HypervisorState.BlockIndex = s.state.BlockIndex ss.HypervisorState.BlockIndex = s.state.BlockIndex
return nil
} }
func deviceToDeviceState(devices []api.Device) (dss []persistapi.DeviceState) { func deviceToDeviceState(devices []api.Device) (dss []persistapi.DeviceState) {
@ -61,7 +67,7 @@ func deviceToDeviceState(devices []api.Device) (dss []persistapi.DeviceState) {
return return
} }
func (s *Sandbox) dumpDevices(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error { func (s *Sandbox) dumpDevices(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) {
ss.Devices = deviceToDeviceState(s.devManager.GetAllDevices()) ss.Devices = deviceToDeviceState(s.devManager.GetAllDevices())
for id, cont := range s.containers { for id, cont := range s.containers {
@ -90,48 +96,34 @@ func (s *Sandbox) dumpDevices(ss *persistapi.SandboxState, cs map[string]persist
delete(cs, id) delete(cs, id)
} }
} }
return nil
} }
// verSaveCallback set persist data version to current version in runtime func (s *Sandbox) Save() error {
func (s *Sandbox) verSaveCallback() { var (
s.newStore.AddSaveCallback("version", func(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error { ss = persistapi.SandboxState{}
ss.PersistVersion = persistapi.CurPersistVersion cs = make(map[string]persistapi.ContainerState)
return nil )
})
}
// stateSaveCallback register hook to set sandbox and container state to persist s.dumpVersion(&ss)
func (s *Sandbox) stateSaveCallback() { s.dumpState(&ss, cs)
s.newStore.AddSaveCallback("state", s.dumpState) s.dumpHypervisor(&ss, cs)
} s.dumpDevices(&ss, cs)
// hvStateSaveCallback register hook to save hypervisor state to persist data if err := s.newStore.ToDisk(ss, cs); err != nil {
func (s *Sandbox) hvStateSaveCallback() { return err
s.newStore.AddSaveCallback("hypervisor", s.dumpHypervisor)
}
// PersistDevices register hook to save device informations
func (s *Sandbox) devicesSaveCallback() {
s.newStore.AddSaveCallback("devices", s.dumpDevices)
}
func (s *Sandbox) getSbxAndCntStates() (*persistapi.SandboxState, map[string]persistapi.ContainerState, error) {
if err := s.newStore.FromDisk(s.id); err != nil {
return nil, nil, err
} }
return s.newStore.GetStates() return nil
} }
// Restore will restore sandbox data from persist file on disk // Restore will restore sandbox data from persist file on disk
func (s *Sandbox) Restore() error { func (s *Sandbox) Restore() error {
ss, _, err := s.getSbxAndCntStates() ss, _, err := s.newStore.FromDisk(s.id)
if err != nil { if err != nil {
return err return err
} }
s.state.PersistVersion = ss.PersistVersion
s.state.GuestMemoryBlockSizeMB = ss.GuestMemoryBlockSizeMB s.state.GuestMemoryBlockSizeMB = ss.GuestMemoryBlockSizeMB
s.state.BlockIndex = ss.HypervisorState.BlockIndex s.state.BlockIndex = ss.HypervisorState.BlockIndex
s.state.State = types.StateString(ss.State) s.state.State = types.StateString(ss.State)
@ -141,8 +133,9 @@ func (s *Sandbox) Restore() error {
} }
// Restore will restore container data from persist file on disk // Restore will restore container data from persist file on disk
// TODO:
func (c *Container) Restore() error { func (c *Container) Restore() error {
_, cs, err := c.sandbox.getSbxAndCntStates() _, cs, err := c.sandbox.newStore.FromDisk(c.sandbox.id)
if err != nil { if err != nil {
return err return err
} }

View File

@ -8,16 +8,10 @@ package persistapi
// PersistDriver is interface describing operations to save/restore persist data // PersistDriver is interface describing operations to save/restore persist data
type PersistDriver interface { type PersistDriver interface {
// ToDisk flushes data to disk(or other storage media such as a remote db) // ToDisk flushes data to disk(or other storage media such as a remote db)
ToDisk() error ToDisk(SandboxState, map[string]ContainerState) error
// FromDisk will restore all data for sandbox with `sid` from storage. // FromDisk will restore all data for sandbox with `sid` from storage.
// We only support get data for one whole sandbox // We only support get data for one whole sandbox
FromDisk(sid string) error FromDisk(sid string) (SandboxState, map[string]ContainerState, error)
// AddSaveCallback addes callback function named `name` to driver storage list
// The callback functions will be invoked when calling `ToDisk()`, notice that
// callback functions are not order guaranteed,
AddSaveCallback(name string, f SetFunc)
// Destroy will remove everything from storage // Destroy will remove everything from storage
Destroy() error Destroy() error
// GetStates will return SandboxState and ContainerState(s) directly
GetStates() (*SandboxState, map[string]ContainerState, error)
} }

View File

@ -44,7 +44,6 @@ var runStoragePath = filepath.Join("/run", storagePathSuffix, sandboxPathSuffix)
type FS struct { type FS struct {
sandboxState *persistapi.SandboxState sandboxState *persistapi.SandboxState
containerState map[string]persistapi.ContainerState containerState map[string]persistapi.ContainerState
setFuncs map[string]persistapi.SetFunc
lockFile *os.File lockFile *os.File
} }
@ -68,7 +67,6 @@ func Init() (persistapi.PersistDriver, error) {
return &FS{ return &FS{
sandboxState: &persistapi.SandboxState{}, sandboxState: &persistapi.SandboxState{},
containerState: make(map[string]persistapi.ContainerState), containerState: make(map[string]persistapi.ContainerState),
setFuncs: make(map[string]persistapi.SetFunc),
}, nil }, nil
} }
@ -82,11 +80,9 @@ func (fs *FS) sandboxDir() (string, error) {
} }
// ToDisk sandboxState and containerState to disk // ToDisk sandboxState and containerState to disk
func (fs *FS) ToDisk() (retErr error) { func (fs *FS) ToDisk(ss persistapi.SandboxState, cs map[string]persistapi.ContainerState) (retErr error) {
// call registered hooks to set sandboxState and containerState fs.sandboxState = &ss
for _, fun := range fs.setFuncs { fs.containerState = cs
fun(fs.sandboxState, fs.containerState)
}
sandboxDir, err := fs.sandboxDir() sandboxDir, err := fs.sandboxDir()
if err != nil { if err != nil {
@ -146,20 +142,21 @@ func (fs *FS) ToDisk() (retErr error) {
} }
// FromDisk restores state for sandbox with name sid // FromDisk restores state for sandbox with name sid
func (fs *FS) FromDisk(sid string) error { func (fs *FS) FromDisk(sid string) (persistapi.SandboxState, map[string]persistapi.ContainerState, error) {
ss := persistapi.SandboxState{}
if sid == "" { if sid == "" {
return fmt.Errorf("restore requires sandbox id") return ss, nil, fmt.Errorf("restore requires sandbox id")
} }
fs.sandboxState.SandboxContainer = sid fs.sandboxState.SandboxContainer = sid
sandboxDir, err := fs.sandboxDir() sandboxDir, err := fs.sandboxDir()
if err != nil { if err != nil {
return err return ss, nil, err
} }
if err := fs.lock(); err != nil { if err := fs.lock(); err != nil {
return err return ss, nil, err
} }
defer fs.unlock() defer fs.unlock()
@ -167,18 +164,18 @@ func (fs *FS) FromDisk(sid string) error {
sandboxFile := filepath.Join(sandboxDir, persistFile) sandboxFile := filepath.Join(sandboxDir, persistFile)
f, err := os.OpenFile(sandboxFile, os.O_RDONLY, fileMode) f, err := os.OpenFile(sandboxFile, os.O_RDONLY, fileMode)
if err != nil { if err != nil {
return err return ss, nil, err
} }
defer f.Close() defer f.Close()
if err := json.NewDecoder(f).Decode(fs.sandboxState); err != nil { if err := json.NewDecoder(f).Decode(fs.sandboxState); err != nil {
return err return ss, nil, err
} }
// walk sandbox dir and find container // walk sandbox dir and find container
files, err := ioutil.ReadDir(sandboxDir) files, err := ioutil.ReadDir(sandboxDir)
if err != nil { if err != nil {
return err return ss, nil, err
} }
for _, file := range files { for _, file := range files {
@ -194,18 +191,19 @@ func (fs *FS) FromDisk(sid string) error {
if os.IsNotExist(err) { if os.IsNotExist(err) {
continue continue
} }
return err return ss, nil, err
} }
var cstate persistapi.ContainerState var cstate persistapi.ContainerState
if err := json.NewDecoder(cf).Decode(&cstate); err != nil { if err := json.NewDecoder(cf).Decode(&cstate); err != nil {
return err return ss, nil, err
} }
cf.Close() cf.Close()
fs.containerState[cid] = cstate fs.containerState[cid] = cstate
} }
return nil
return *fs.sandboxState, fs.containerState, nil
} }
// Destroy removes everything from disk // Destroy removes everything from disk
@ -221,17 +219,6 @@ func (fs *FS) Destroy() error {
return nil return nil
} }
// GetStates returns SandboxState and ContainerState
func (fs *FS) GetStates() (*persistapi.SandboxState, map[string]persistapi.ContainerState, error) {
return fs.sandboxState, fs.containerState, nil
}
// AddSaveCallback registers processing hooks for Dump
func (fs *FS) AddSaveCallback(name string, f persistapi.SetFunc) {
// only accept last registered hook with same name
fs.setFuncs[name] = f
}
func (fs *FS) lock() error { func (fs *FS) lock() error {
sandboxDir, err := fs.sandboxDir() sandboxDir, err := fs.sandboxDir()
if err != nil { if err != nil {

View File

@ -51,31 +51,21 @@ func TestFsDriver(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, fs) assert.NotNil(t, fs)
fs.AddSaveCallback("test", func(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error { ss := persistapi.SandboxState{}
return nil cs := make(map[string]persistapi.ContainerState)
})
// missing sandbox container id // missing sandbox container id
assert.NotNil(t, fs.ToDisk()) assert.NotNil(t, fs.ToDisk(ss, cs))
id := "test-fs-driver" id := "test-fs-driver"
// missing sandbox container id
fs.AddSaveCallback("test", func(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error {
ss.SandboxContainer = id ss.SandboxContainer = id
return nil assert.Nil(t, fs.ToDisk(ss, cs))
})
assert.Nil(t, fs.ToDisk())
fs.AddSaveCallback("test1", func(ss *persistapi.SandboxState, cs map[string]persistapi.ContainerState) error {
ss.State = "running"
return nil
})
// try non-existent dir // try non-existent dir
assert.NotNil(t, fs.FromDisk("test-fs")) _, _, err = fs.FromDisk("test-fs")
assert.NotNil(t, err)
// since we didn't call ToDisk, Callbacks are not invoked, and state is still empty in disk file // since we didn't call ToDisk, state is still empty in disk file
assert.Nil(t, fs.FromDisk(id)) ss, cs, err = fs.FromDisk(id)
ss, cs, err := fs.GetStates()
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, ss) assert.NotNil(t, ss)
assert.Equal(t, len(cs), 0) assert.Equal(t, len(cs), 0)
@ -84,9 +74,9 @@ func TestFsDriver(t *testing.T) {
assert.Equal(t, ss.State, "") assert.Equal(t, ss.State, "")
// flush all to disk // flush all to disk
assert.Nil(t, fs.ToDisk()) ss.State = "running"
assert.Nil(t, fs.FromDisk(id)) assert.Nil(t, fs.ToDisk(ss, cs))
ss, cs, err = fs.GetStates() ss, cs, err = fs.FromDisk(id)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, ss) assert.NotNil(t, ss)
assert.Equal(t, len(cs), 0) assert.Equal(t, len(cs), 0)

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/kata-containers/runtime/virtcontainers/device/manager"
exp "github.com/kata-containers/runtime/virtcontainers/experimental" exp "github.com/kata-containers/runtime/virtcontainers/experimental"
"github.com/kata-containers/runtime/virtcontainers/persist" "github.com/kata-containers/runtime/virtcontainers/persist"
"github.com/kata-containers/runtime/virtcontainers/types" "github.com/kata-containers/runtime/virtcontainers/types"
@ -74,6 +75,7 @@ func TestSandboxRestore(t *testing.T) {
sandbox := Sandbox{ sandbox := Sandbox{
id: "test-exp", id: "test-exp",
containers: container, containers: container,
devManager: manager.NewDeviceManager(manager.VirtioSCSI, nil),
hypervisor: &mockHypervisor{}, hypervisor: &mockHypervisor{},
ctx: context.Background(), ctx: context.Background(),
config: &sconfig, config: &sconfig,
@ -89,7 +91,7 @@ func TestSandboxRestore(t *testing.T) {
assert.True(t, os.IsNotExist(err)) assert.True(t, os.IsNotExist(err))
// disk data are empty // disk data are empty
err = sandbox.newStore.ToDisk() err = sandbox.Save()
assert.Nil(t, err) assert.Nil(t, err)
err = sandbox.Restore() err = sandbox.Restore()
@ -98,14 +100,12 @@ func TestSandboxRestore(t *testing.T) {
assert.Equal(t, sandbox.state.GuestMemoryBlockSizeMB, uint32(0)) assert.Equal(t, sandbox.state.GuestMemoryBlockSizeMB, uint32(0))
assert.Equal(t, sandbox.state.BlockIndex, 0) assert.Equal(t, sandbox.state.BlockIndex, 0)
// register callback function // set state data and save again
sandbox.stateSaveCallback()
sandbox.state.State = types.StateString("running") sandbox.state.State = types.StateString("running")
sandbox.state.GuestMemoryBlockSizeMB = uint32(1024) sandbox.state.GuestMemoryBlockSizeMB = uint32(1024)
sandbox.state.BlockIndex = 2 sandbox.state.BlockIndex = 2
// flush data to disk // flush data to disk
err = sandbox.newStore.ToDisk() err = sandbox.Save()
assert.Nil(t, err) assert.Nil(t, err)
// empty the sandbox // empty the sandbox
@ -116,18 +116,5 @@ func TestSandboxRestore(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, sandbox.state.State, types.StateString("running")) assert.Equal(t, sandbox.state.State, types.StateString("running"))
assert.Equal(t, sandbox.state.GuestMemoryBlockSizeMB, uint32(1024)) assert.Equal(t, sandbox.state.GuestMemoryBlockSizeMB, uint32(1024))
// require hvStateSaveCallback to make it savable
assert.Equal(t, sandbox.state.BlockIndex, 0)
// after use hvStateSaveCallbck, BlockIndex can be saved now
sandbox.state.BlockIndex = 2
sandbox.hvStateSaveCallback()
err = sandbox.newStore.ToDisk()
assert.Nil(t, err)
err = sandbox.Restore()
assert.Nil(t, err)
assert.Equal(t, sandbox.state.State, types.StateString("running"))
assert.Equal(t, sandbox.state.GuestMemoryBlockSizeMB, uint32(1024))
assert.Equal(t, sandbox.state.BlockIndex, 2) assert.Equal(t, sandbox.state.BlockIndex, 2)
} }

View File

@ -481,18 +481,10 @@ func createSandbox(ctx context.Context, sandboxConfig SandboxConfig, factory Fac
s.devManager = deviceManager.NewDeviceManager(sandboxConfig.HypervisorConfig.BlockDeviceDriver, devices) s.devManager = deviceManager.NewDeviceManager(sandboxConfig.HypervisorConfig.BlockDeviceDriver, devices)
if s.supportNewStore() { if s.supportNewStore() {
// register persist hook for now, data will be written to disk by ToDisk()
s.stateSaveCallback()
s.hvStateSaveCallback()
s.devicesSaveCallback()
if err := s.Restore(); err == nil && s.state.State != "" { if err := s.Restore(); err == nil && s.state.State != "" {
return s, nil return s, nil
} }
// if sandbox doesn't exist, set persist version to current version
// otherwise do nothing
s.verSaveCallback()
} else { } else {
// We first try to fetch the sandbox state from storage. // We first try to fetch the sandbox state from storage.
// If it exists, this means this is a re-creation, i.e. // If it exists, this means this is a re-creation, i.e.
@ -619,7 +611,7 @@ func (s *Sandbox) storeSandbox() error {
if s.supportNewStore() { if s.supportNewStore() {
// flush data to storage // flush data to storage
if err := s.newStore.ToDisk(); err != nil { if err := s.Save(); err != nil {
return err return err
} }
} }

View File

@ -43,6 +43,11 @@ type SandboxState struct {
// CgroupPath is the cgroup hierarchy where sandbox's processes // CgroupPath is the cgroup hierarchy where sandbox's processes
// including the hypervisor are placed. // including the hypervisor are placed.
CgroupPath string `json:"cgroupPath,omitempty"` CgroupPath string `json:"cgroupPath,omitempty"`
// PersistVersion indicates current storage api version.
// It's also known as ABI version of kata-runtime.
// Note: it won't be written to disk
PersistVersion uint `json:"-"`
} }
// Valid checks that the sandbox state is valid. // Valid checks that the sandbox state is valid.