Add cache to avoid RAM consumption

When we have huge file lists we can burst too much into RAM which would
cause OOMs in certain devices. Use instead a smart cache which
automatically drops to disk when necessary.
This commit is contained in:
Ettore Di Giacinto
2021-10-26 15:42:48 +02:00
parent 35fcd868ee
commit b974f44095
3 changed files with 290 additions and 10 deletions

172
pkg/api/core/image/cache.go Normal file
View File

@@ -0,0 +1,172 @@
// Copyright © 2021 Ettore Di Giacinto <mudler@mocaccino.org>
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along
// with this program; if not, see <http://www.gnu.org/licenses/>.
package image
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/peterbourgon/diskv"
)
// Cache represents a key-value store which is capable to upgrade to disk when it
// reaches a pre-defined threshold.
type Cache struct {
store *diskv.Diskv
memory map[string]string
dir string
onDisk bool
maxmemorySize, maxItemSize int
}
// New creates a new key-value cache
// the cache acts in memory as long as the maxItemsize is not reached.
// Once the threshold is met the cache is offloaded to disk automatically,
// with a buffer of maxmemorySize into memory.
func NewCache(path string, maxmemorySize, maxItemsize int) *Cache {
disk := diskv.New(diskv.Options{
BasePath: path,
CacheSizeMax: uint64(maxmemorySize), // 500MB
})
return &Cache{
memory: make(map[string]string),
store: disk,
dir: path,
maxmemorySize: maxmemorySize,
maxItemSize: maxItemsize,
}
}
// This is needed as the disk cache is merely stored as separate files
// thus we don't want to conflict file names with the path separator.
// XXX: This is inconvenient as while we are looping result we can't rely
// anymore originally to the key name.
// We don't do any hashing to avoid any performance impact
func cleanKey(s string) string {
return strings.ReplaceAll(s, string(os.PathSeparator), "_")
}
// Count returns the items in the cache.
// If it's a disk cache might be an expensive call.
func (c *Cache) Count() int {
if !c.onDisk {
return len(c.memory)
}
count := 0
for range c.store.Keys(nil) {
count++
}
return count
}
// Get attempts to retrieve a value for a key
func (c *Cache) Get(key string) (value string, found bool) {
if !c.onDisk {
v, ok := c.memory[key]
return v, ok
}
v, err := c.store.Read(cleanKey(key))
if err == nil {
found = true
}
value = string(v)
return
}
func (c *Cache) flushToDisk() {
for k, v := range c.memory {
c.store.Write(cleanKey(k), []byte(v))
}
c.memory = make(map[string]string)
c.onDisk = true
}
// Set updates or inserts a new value
func (c *Cache) Set(key, value string) error {
if !c.onDisk && c.Count() >= c.maxItemSize && c.maxItemSize != 0 {
c.flushToDisk()
}
if c.onDisk {
return c.store.Write(cleanKey(key), []byte(value))
}
c.memory[key] = value
return nil
}
// SetValue updates or inserts a new value by marshalling it into JSON.
func (c *Cache) SetValue(key string, value interface{}) error {
dat, err := json.Marshal(value)
if err != nil {
return err
}
return c.Set(cleanKey(key), string(dat))
}
// CacheResult represent the key value result when
// iterating over the cache
type CacheResult struct {
key, value string
}
// Value returns the underlying value
func (c CacheResult) Value() string {
return c.value
}
// Key returns the cache result key
func (c CacheResult) Key() string {
return c.key
}
// Unmarshal the result into the interface. Use it to retrieve data
// set with SetValue
func (c CacheResult) Unmarshal(i interface{}) error {
return json.Unmarshal([]byte(c.Value()), i)
}
// Iterates over cache by key
func (c *Cache) All(fn func(CacheResult)) {
if !c.onDisk {
for k, v := range c.memory {
fn(CacheResult{key: k, value: v})
}
return
}
for key := range c.store.Keys(nil) {
val, err := c.store.Read(key)
if err != nil {
panic(fmt.Sprintf("key %s had no value", key))
}
fn(CacheResult{key: key, value: string(val)})
}
}
// Clean the cache
func (c *Cache) Clean() {
c.memory = make(map[string]string)
c.onDisk = false
os.RemoveAll(c.dir)
}

View File

@@ -0,0 +1,98 @@
// Copyright © 2021 Ettore Di Giacinto <mudler@gentoo.org>
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along
// with this program; if not, see <http://www.gnu.org/licenses/>.
package image_test
import (
"path/filepath"
. "github.com/mudler/luet/pkg/api/core/image"
"github.com/mudler/luet/pkg/api/core/types"
"github.com/mudler/luet/pkg/helpers/file"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Cache", func() {
ctx := types.NewContext()
Context("used as k/v store", func() {
cache := &Cache{}
var dir string
BeforeEach(func() {
ctx = types.NewContext()
var err error
dir, err = ctx.Config.System.TempDir("foo")
Expect(err).ToNot(HaveOccurred())
cache = NewCache(dir, 10*1024*1024, 1) // 10MB Cache when upgrading to files. Max volatile memory of 1 row.
})
AfterEach(func() {
cache.Clean()
})
It("does handle automatically memory upgrade", func() {
cache.Set("foo", "bar")
v, found := cache.Get("foo")
Expect(found).To(BeTrue())
Expect(v).To(Equal("bar"))
Expect(file.Exists(filepath.Join(dir, "foo"))).To(BeFalse())
cache.Set("baz", "bar")
Expect(file.Exists(filepath.Join(dir, "foo"))).To(BeTrue())
Expect(file.Exists(filepath.Join(dir, "baz"))).To(BeTrue())
v, found = cache.Get("foo")
Expect(found).To(BeTrue())
Expect(v).To(Equal("bar"))
Expect(cache.Count()).To(Equal(2))
})
It("does CRUD", func() {
cache.Set("foo", "bar")
v, found := cache.Get("foo")
Expect(found).To(BeTrue())
Expect(v).To(Equal("bar"))
hit := false
cache.All(func(c CacheResult) {
hit = true
Expect(c.Key()).To(Equal("foo"))
Expect(c.Value()).To(Equal("bar"))
})
Expect(hit).To(BeTrue())
})
It("Unmarshals values", func() {
type testStruct struct {
Test string
}
cache.SetValue("foo", &testStruct{Test: "baz"})
n := &testStruct{}
cache.All(func(cr CacheResult) {
err := cr.Unmarshal(n)
Expect(err).ToNot(HaveOccurred())
})
Expect(n.Test).To(Equal("baz"))
})
})
})

View File

@@ -45,8 +45,12 @@ func ExtractDeltaAdditionsFiles(
includeRegexp := compileRegexes(includes) includeRegexp := compileRegexes(includes)
excludeRegexp := compileRegexes(excludes) excludeRegexp := compileRegexes(excludes)
filesSrc := map[string]interface{}{}
srcfilesd, err := ctx.Config.System.TempDir("srcfiles")
if err != nil {
return nil, err
}
filesSrc := NewCache(srcfilesd, 50*1024*1024, 10000)
srcReader := mutate.Extract(srcimg) srcReader := mutate.Extract(srcimg)
defer srcReader.Close() defer srcReader.Close()
@@ -63,12 +67,12 @@ func ExtractDeltaAdditionsFiles(
if err != nil { if err != nil {
return nil, err return nil, err
} }
filesSrc[hdr.Name] = nil filesSrc.Set(hdr.Name, "")
} }
return func(h *tar.Header) (bool, error) { return func(h *tar.Header) (bool, error) {
fileName := filepath.Join(string(os.PathSeparator), h.Name) fileName := filepath.Join(string(os.PathSeparator), h.Name)
_, exists := filesSrc[h.Name] _, exists := filesSrc.Get(h.Name)
if exists { if exists {
return false, nil return false, nil
} }
@@ -194,17 +198,22 @@ func ExtractReader(ctx *types.Context, reader io.ReadCloser, output string, keep
Name string Name string
} }
perms := []permData{} permstore, err := ctx.Config.System.TempDir("permstore")
if err != nil {
return 0, "", err
}
perms := NewCache(permstore, 50*1024*1024, 10000)
f := func(h *tar.Header) (bool, error) { f := func(h *tar.Header) (bool, error) {
res, err := filter(h) res, err := filter(h)
if res { if res {
perms = append(perms, permData{ perms.SetValue(h.Name, permData{
PAX: h.PAXRecords, PAX: h.PAXRecords,
Uid: h.Uid, Gid: h.Gid, Uid: h.Uid, Gid: h.Gid,
Xattrs: h.Xattrs, Xattrs: h.Xattrs,
Name: h.Name, Name: h.Name,
}) })
//perms = append(perms, })
} }
return res, err return res, err
} }
@@ -219,8 +228,10 @@ func ExtractReader(ctx *types.Context, reader io.ReadCloser, output string, keep
// Reconstruct permissions // Reconstruct permissions
if keepPerms { if keepPerms {
ctx.Info("Reconstructing permissions") ctx.Debug("Reconstructing permissions")
for _, p := range perms { perms.All(func(cr CacheResult) {
p := &permData{}
cr.Unmarshal(p)
ff := filepath.Join(output, p.Name) ff := filepath.Join(output, p.Name)
if _, err := os.Lstat(ff); err == nil { if _, err := os.Lstat(ff); err == nil {
if err := os.Lchown(ff, p.Uid, p.Gid); err != nil { if err := os.Lchown(ff, p.Uid, p.Gid); err != nil {
@@ -236,9 +247,8 @@ func ExtractReader(ctx *types.Context, reader io.ReadCloser, output string, keep
} }
} }
} }
} })
} }
return c, output, nil return c, output, nil
} }