mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-23 13:47:19 +00:00
Use checksums instead of fsyncs to manage discovery cache corruption
Part of the API discovery cache uses an HTTP RoundTripper that transparently caches responses to disk. The upstream implementation of the disk cache is hard coded to call Sync() on every file it writes. This has noticably poor performance on modern Macs, which ask their disk controllers to flush all the way to persistant storage because Go uses the `F_FULLFSYNC` fnctl. Apple recommends minimizing this behaviour in order to avoid degrading performance and increasing disk wear. The content of the discovery cache is not critical; it is indeed just a cache and can be recreated by hitting the API servers' discovery endpoints. This commit replaces upstream httpcache's diskcache implementation with a similar implementation that can use CRC-32 checksums to detect corrupted cache entries at read-time. When such an entry is detected (e.g. because it was only partially flushed to permanent storage before the host lost power) the cache will report a miss. This causes httpcache to fall back to its underlying HTTP transport (i.e. the real API server) and re-cache the resulting value. Apart from adding CRC-32 checksums and avoiding calling fsync this implementation differs from upstream httpcache's diskcache package in that it uses FNV-32a hashes rather than MD5 hashes of cache keys in order to generate filenames. Signed-off-by: Nic Cope <nicc@rk0n.org> Kubernetes-commit: 7a2c6a432f9e8db8b84abe5607843429f8bff417
This commit is contained in:
parent
76fccca0ea
commit
1ea239faa5
@ -17,12 +17,15 @@ limitations under the License.
|
||||
package disk
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"hash/fnv"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/gregjones/httpcache"
|
||||
"github.com/gregjones/httpcache/diskcache"
|
||||
"github.com/peterbourgon/diskv"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@ -41,7 +44,7 @@ func newCacheRoundTripper(cacheDir string, rt http.RoundTripper) http.RoundTripp
|
||||
BasePath: cacheDir,
|
||||
TempDir: filepath.Join(cacheDir, ".diskv-temp"),
|
||||
})
|
||||
t := httpcache.NewTransport(diskcache.NewWithDiskv(d))
|
||||
t := httpcache.NewTransport(&crcDiskCache{disk: d})
|
||||
t.Transport = rt
|
||||
|
||||
return &cacheRoundTripper{rt: t}
|
||||
@ -63,3 +66,54 @@ func (rt *cacheRoundTripper) CancelRequest(req *http.Request) {
|
||||
}
|
||||
|
||||
func (rt *cacheRoundTripper) WrappedRoundTripper() http.RoundTripper { return rt.rt.Transport }
|
||||
|
||||
// A crcDiskCache is a cache backend for github.com/gregjones/httpcache. It is
|
||||
// similar to httpcache's diskcache package, but uses checksums to ensure cache
|
||||
// integrity rather than fsyncing each cache entry in order to avoid performance
|
||||
// degradation on MacOS.
|
||||
//
|
||||
// See https://github.com/kubernetes/kubernetes/issues/110753 for more.
|
||||
type crcDiskCache struct {
|
||||
disk *diskv.Diskv
|
||||
}
|
||||
|
||||
// Get the requested key from the cache on disk. If Get encounters an error, or
|
||||
// the returned value is not a CRC-32 checksum followed by bytes with a matching
|
||||
// checksum it will return false to indicate a cache miss.
|
||||
func (c *crcDiskCache) Get(key string) ([]byte, bool) {
|
||||
b, err := c.disk.Read(sanitize(key))
|
||||
if err != nil || len(b) < binary.MaxVarintLen32 {
|
||||
return []byte{}, false
|
||||
}
|
||||
|
||||
response := b[binary.MaxVarintLen32:]
|
||||
sum, _ := binary.Uvarint(b[:binary.MaxVarintLen32])
|
||||
if crc32.ChecksumIEEE(response) != uint32(sum) {
|
||||
return []byte{}, false
|
||||
}
|
||||
|
||||
return response, true
|
||||
}
|
||||
|
||||
// Set writes the response to a file on disk. The filename will be the FNV-32a
|
||||
// hash of the key. The file will contain the CRC-32 checksum of the response
|
||||
// bytes, followed by said response bytes.
|
||||
func (c *crcDiskCache) Set(key string, response []byte) {
|
||||
sum := make([]byte, binary.MaxVarintLen32)
|
||||
_ = binary.PutUvarint(sum, uint64(crc32.ChecksumIEEE(response)))
|
||||
_ = c.disk.Write(sanitize(key), append(sum, response...)) // Nothing we can do with this error.
|
||||
}
|
||||
|
||||
func (c *crcDiskCache) Delete(key string) {
|
||||
_ = c.disk.Erase(sanitize(key)) // Nothing we can do with this error.
|
||||
}
|
||||
|
||||
// Sanitize an httpcache key such that it can be used as a diskv key, which must
|
||||
// be a valid filename. The httpcache key will either be the requested URL (if
|
||||
// the request method was GET) or "<method> <url>" for other methods, per the
|
||||
// httpcache.cacheKey function.
|
||||
func sanitize(key string) string {
|
||||
h := fnv.New32a()
|
||||
_, _ = h.Write([]byte(key)) // Writing to a hash never returns an error.
|
||||
return fmt.Sprintf("%X", h.Sum32())
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ package disk
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"hash/crc32"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -25,7 +27,6 @@ import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/gregjones/httpcache/diskcache"
|
||||
"github.com/peterbourgon/diskv"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -42,9 +43,6 @@ func (rt *testRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)
|
||||
return rt.Response, rt.Err
|
||||
}
|
||||
|
||||
// NOTE(negz): We're adding a benchmark for an external dependency in order to
|
||||
// prove that one that will be added in a subsequent commit improves write
|
||||
// performance.
|
||||
func BenchmarkDiskCache(b *testing.B) {
|
||||
cacheDir, err := ioutil.TempDir("", "cache-rt")
|
||||
if err != nil {
|
||||
@ -65,7 +63,7 @@ func BenchmarkDiskCache(b *testing.B) {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
c := diskcache.NewWithDiskv(d)
|
||||
c := crcDiskCache{disk: d}
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
c.Set(k, v)
|
||||
@ -179,3 +177,147 @@ func TestCacheRoundTripperPathPerm(t *testing.T) {
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestCRCDiskCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
// Ensure that we'll return a cache miss if the backing file doesn't exist.
|
||||
t.Run("NoSuchKey", func(t *testing.T) {
|
||||
cacheDir, err := ioutil.TempDir("", "cache-crc")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(cacheDir)
|
||||
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
|
||||
crc := &crcDiskCache{disk: d}
|
||||
|
||||
key := "testing"
|
||||
|
||||
got, ok := crc.Get(key)
|
||||
assert.False(ok)
|
||||
assert.Equal([]byte{}, got)
|
||||
})
|
||||
|
||||
// Ensure that we'll return a cache miss if the backing file is empty.
|
||||
t.Run("EmptyFile", func(t *testing.T) {
|
||||
cacheDir, err := ioutil.TempDir("", "cache-crc")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(cacheDir)
|
||||
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
|
||||
crc := &crcDiskCache{disk: d}
|
||||
|
||||
key := "testing"
|
||||
|
||||
f, err := os.Create(filepath.Join(cacheDir, sanitize(key)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
got, ok := crc.Get(key)
|
||||
assert.False(ok)
|
||||
assert.Equal([]byte{}, got)
|
||||
})
|
||||
|
||||
// Ensure that we'll return a cache miss if the backing has an invalid
|
||||
// checksum.
|
||||
t.Run("InvalidChecksum", func(t *testing.T) {
|
||||
cacheDir, err := ioutil.TempDir("", "cache-crc")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(cacheDir)
|
||||
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
|
||||
crc := &crcDiskCache{disk: d}
|
||||
|
||||
key := "testing"
|
||||
value := []byte("testing")
|
||||
mismatchedValue := []byte("testink")
|
||||
sum := make([]byte, binary.MaxVarintLen32)
|
||||
binary.PutUvarint(sum, uint64(crc32.ChecksumIEEE(value)))
|
||||
|
||||
// Create a file with the checksum of 'value' followed by the bytes of
|
||||
// 'mismatchedValue'.
|
||||
f, err := os.Create(filepath.Join(cacheDir, sanitize(key)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Write(sum)
|
||||
f.Write(mismatchedValue)
|
||||
f.Close()
|
||||
|
||||
// The mismatched checksum should result in a cache miss.
|
||||
got, ok := crc.Get(key)
|
||||
assert.False(ok)
|
||||
assert.Equal([]byte{}, got)
|
||||
})
|
||||
|
||||
// Ensure that our disk cache will happily cache over the top of an existing
|
||||
// value. We depend on this behaviour to recover from corrupted cache
|
||||
// entries. When Get detects a bad checksum it will return a cache miss.
|
||||
// This should cause httpcache to fall back to its underlying transport and
|
||||
// to subsequently cache the new value, overwriting the corrupt one.
|
||||
t.Run("OverwriteExistingKey", func(t *testing.T) {
|
||||
cacheDir, err := ioutil.TempDir("", "cache-crc")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(cacheDir)
|
||||
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
|
||||
crc := &crcDiskCache{disk: d}
|
||||
|
||||
key := "testing"
|
||||
value := []byte("cool value!")
|
||||
|
||||
// Write a value.
|
||||
crc.Set(key, value)
|
||||
got, ok := crc.Get(key)
|
||||
|
||||
// Ensure we can read back what we wrote.
|
||||
assert.True(ok)
|
||||
assert.Equal(value, got)
|
||||
|
||||
differentValue := []byte("I'm different!")
|
||||
|
||||
// Write a different value.
|
||||
crc.Set(key, differentValue)
|
||||
got, ok = crc.Get(key)
|
||||
|
||||
// Ensure we can read back the different value.
|
||||
assert.True(ok)
|
||||
assert.Equal(differentValue, got)
|
||||
})
|
||||
|
||||
// Ensure that deleting a key does in fact delete it.
|
||||
t.Run("DeleteKey", func(t *testing.T) {
|
||||
cacheDir, err := ioutil.TempDir("", "cache-crc")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(cacheDir)
|
||||
d := diskv.New(diskv.Options{BasePath: cacheDir, TempDir: filepath.Join(cacheDir, ".diskv-temp")})
|
||||
crc := &crcDiskCache{disk: d}
|
||||
|
||||
key := "testing"
|
||||
value := []byte("coolValue")
|
||||
|
||||
crc.Set(key, value)
|
||||
|
||||
// Ensure we successfully set the value.
|
||||
got, ok := crc.Get(key)
|
||||
assert.True(ok)
|
||||
assert.Equal(value, got)
|
||||
|
||||
crc.Delete(key)
|
||||
|
||||
// Ensure the value is gone.
|
||||
got, ok = crc.Get(key)
|
||||
assert.False(ok)
|
||||
assert.Equal([]byte{}, got)
|
||||
|
||||
// Ensure that deleting a non-existent value is a no-op.
|
||||
crc.Delete(key)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user