mirror of
https://github.com/rancher/steve.git
synced 2025-09-21 19:37:58 +00:00
@@ -213,7 +213,7 @@ func setup(ctx context.Context, server *Server) error {
|
||||
if server.SQLCache {
|
||||
sqlStore, err := sqlproxy.NewProxyStore(ctx, cols, cf, summaryCache, summaryCache, server.cacheFactory, false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
errStore := proxy.NewErrorStore(
|
||||
|
169
pkg/sqlcache/db/encoding.go
Normal file
169
pkg/sqlcache/db/encoding.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// necessary in order to gob/ungob unstructured.Unstructured objects
|
||||
gob.Register(map[string]any{})
|
||||
gob.Register([]any{})
|
||||
}
|
||||
|
||||
type encoding interface {
|
||||
// Encode serializes an object into the provided writer
|
||||
Encode(io.Writer, any) error
|
||||
// Decode reads from a provided reader and deserializes into an object
|
||||
Decode(io.Reader, any) error
|
||||
}
|
||||
|
||||
type gobEncoding struct {
|
||||
writeLock, readLock sync.Mutex
|
||||
writeBuf, readBuf bytes.Buffer
|
||||
encoder *gob.Encoder
|
||||
decoder *gob.Decoder
|
||||
seenTypes map[reflect.Type]struct{}
|
||||
}
|
||||
|
||||
func (g *gobEncoding) Encode(w io.Writer, obj any) error {
|
||||
g.writeLock.Lock()
|
||||
defer g.writeLock.Unlock()
|
||||
|
||||
if g.encoder == nil {
|
||||
g.encoder = gob.NewEncoder(&g.writeBuf)
|
||||
}
|
||||
|
||||
g.writeBuf.Reset()
|
||||
if err := g.encoder.Encode(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := g.registerTypeIfNeeded(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := g.writeBuf.WriteTo(w)
|
||||
return err
|
||||
}
|
||||
|
||||
// registerTypeIfNeeded prevents future decoding errors by running Decode right after the first Encode for an object type
|
||||
// This is needed when reusing a gob.Encoder, as it assumes the receiving end of those messages is always the same gob.Decoder.
|
||||
// Due to this assumption, it applies some optimizations, like avoiding sending complete type's information (needed for decoding) if it already did it before.
|
||||
// This means the first object for each type encoded by a gob.Encoder will have a bigger payload, but more importantly,
|
||||
// the decoding will fail if this is not the first object being decoded later. This function forces that to prevent this from happening.
|
||||
func (g *gobEncoding) registerTypeIfNeeded(obj any) error {
|
||||
if g.seenTypes == nil {
|
||||
g.seenTypes = make(map[reflect.Type]struct{})
|
||||
}
|
||||
|
||||
typ := reflect.TypeOf(obj)
|
||||
if _, ok := g.seenTypes[typ]; !ok {
|
||||
g.seenTypes[typ] = struct{}{}
|
||||
// Consume the current write buffer and re-generate it (will produce a smaller version)
|
||||
newObj := reflect.New(typ).Interface()
|
||||
if err := g.Decode(bytes.NewReader(g.writeBuf.Bytes()), newObj); err != nil {
|
||||
return fmt.Errorf("could not decode %T: %w", obj, err)
|
||||
}
|
||||
|
||||
g.writeBuf.Reset()
|
||||
return g.encoder.Encode(obj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *gobEncoding) Decode(r io.Reader, into any) error {
|
||||
g.readLock.Lock()
|
||||
defer g.readLock.Unlock()
|
||||
|
||||
if g.decoder == nil {
|
||||
g.decoder = gob.NewDecoder(&g.readBuf)
|
||||
}
|
||||
|
||||
g.readBuf.Reset()
|
||||
if _, err := g.readBuf.ReadFrom(r); err != nil {
|
||||
return err
|
||||
}
|
||||
return g.decoder.Decode(into)
|
||||
}
|
||||
|
||||
type jsonEncoding struct {
|
||||
indentLevel int
|
||||
}
|
||||
|
||||
func (j jsonEncoding) Encode(w io.Writer, obj any) error {
|
||||
enc := json.NewEncoder(w)
|
||||
if j.indentLevel > 0 {
|
||||
enc.SetIndent("", strings.Repeat(" ", j.indentLevel))
|
||||
}
|
||||
|
||||
if err := enc.Encode(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j jsonEncoding) Decode(r io.Reader, into any) error {
|
||||
return json.NewDecoder(r).Decode(into)
|
||||
}
|
||||
|
||||
type gzipEncoding struct {
|
||||
encoding
|
||||
writers sync.Pool
|
||||
readers sync.Pool
|
||||
}
|
||||
|
||||
func gzipped(wrapped encoding) *gzipEncoding {
|
||||
gz := gzipEncoding{
|
||||
encoding: wrapped,
|
||||
}
|
||||
return &gz
|
||||
}
|
||||
|
||||
func (gz *gzipEncoding) Encode(w io.Writer, obj any) error {
|
||||
gzw, ok := gz.writers.Get().(*gzip.Writer)
|
||||
if !ok {
|
||||
gzw = gzip.NewWriter(io.Discard)
|
||||
}
|
||||
gzw.Reset(w)
|
||||
defer func() {
|
||||
gzw.Reset(nil)
|
||||
gz.writers.Put(gzw)
|
||||
}()
|
||||
|
||||
if err := gz.encoding.Encode(gzw, obj); err != nil {
|
||||
return err
|
||||
}
|
||||
return gzw.Close()
|
||||
}
|
||||
|
||||
func (gz *gzipEncoding) Decode(r io.Reader, into any) error {
|
||||
gzr, ok := gz.readers.Get().(*gzip.Reader)
|
||||
if ok {
|
||||
if err := gzr.Reset(r); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
gzr, err = gzip.NewReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
gzr.Close()
|
||||
gz.readers.Put(gzr)
|
||||
}()
|
||||
|
||||
if err := gz.encoding.Decode(gzr, into); err != nil {
|
||||
return err
|
||||
}
|
||||
return gzr.Close()
|
||||
}
|
163
pkg/sqlcache/db/encoding_test.go
Normal file
163
pkg/sqlcache/db/encoding_test.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
var random = rand.New(rand.NewSource(0))
|
||||
|
||||
func TestEquality(t *testing.T) {
|
||||
testObject := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "test",
|
||||
Annotations: map[string]string{"annotation": "test"},
|
||||
Labels: map[string]string{"label": "test"},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{
|
||||
Name: "test",
|
||||
Image: "testimage",
|
||||
VolumeMounts: []corev1.VolumeMount{{
|
||||
Name: "test",
|
||||
MountPath: "/test",
|
||||
}},
|
||||
}},
|
||||
Volumes: []corev1.Volume{{
|
||||
Name: "test",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
EmptyDir: &corev1.EmptyDirVolumeSource{},
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
encoding encoding
|
||||
}{
|
||||
{name: "gob", encoding: &gobEncoding{}},
|
||||
{name: "json", encoding: &jsonEncoding{}},
|
||||
{name: "gob+gz", encoding: gzipped(&gobEncoding{})},
|
||||
{name: "json+gz", encoding: gzipped(&jsonEncoding{})},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
if err := tt.encoding.Encode(&buf, testObject); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var dest *corev1.Pod
|
||||
if err := tt.encoding.Decode(bytes.NewReader(buf.Bytes()), &dest); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, testObject, dest)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodings(b *testing.B) {
|
||||
cm := &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "test",
|
||||
},
|
||||
// a single entry map with 10K zero characters should account for ~10KB ConfigMap size,
|
||||
Data: generateTestData(10000),
|
||||
}
|
||||
pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "test",
|
||||
Annotations: map[string]string{"annotation": "test"},
|
||||
Labels: map[string]string{"label": "test"},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{{
|
||||
Name: "test",
|
||||
Image: "testimage",
|
||||
VolumeMounts: []corev1.VolumeMount{{
|
||||
Name: "test",
|
||||
MountPath: "/test",
|
||||
}},
|
||||
}},
|
||||
Volumes: []corev1.Volume{{
|
||||
Name: "test",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
EmptyDir: &corev1.EmptyDirVolumeSource{},
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
encoding encoding
|
||||
testObject any
|
||||
}{
|
||||
{name: "gob", encoding: &gobEncoding{}},
|
||||
{name: "json", encoding: &jsonEncoding{}},
|
||||
{name: "gob+gzip", encoding: gzipped(&gobEncoding{})},
|
||||
{name: "json+gzip", encoding: gzipped(&jsonEncoding{})},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
for objType, testObject := range map[string]any{
|
||||
"10KB-configmap": cm,
|
||||
"pod": pod,
|
||||
} {
|
||||
b.Run(tt.name+"-"+objType, func(b *testing.B) {
|
||||
b.Run("encoding", func(b *testing.B) {
|
||||
for b.Loop() {
|
||||
w := new(discardWriter)
|
||||
if err := tt.encoding.Encode(w, testObject); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
b.ReportMetric(float64(w.count), "bytes")
|
||||
}
|
||||
})
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := tt.encoding.Encode(&buf, testObject); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
serialized := buf.Bytes()
|
||||
b.Run("decoding", func(b *testing.B) {
|
||||
var dest corev1.ConfigMap
|
||||
for b.Loop() {
|
||||
if err := tt.encoding.Decode(bytes.NewReader(serialized), &dest); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type discardWriter struct {
|
||||
count int
|
||||
}
|
||||
|
||||
func (d *discardWriter) Write(p []byte) (int, error) {
|
||||
n, err := io.Discard.Write(p)
|
||||
d.count += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
// single-entry map, whose value is a randomly-generated string of n size
|
||||
func generateTestData(n int) map[string]string {
|
||||
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
b := make([]byte, n)
|
||||
for i := range b {
|
||||
b[i] = letters[random.Int63()%int64(len(letters))]
|
||||
}
|
||||
return map[string]string{
|
||||
"data": string(b),
|
||||
}
|
||||
}
|
@@ -79,13 +79,17 @@ var _ cache.Store = (*Store)(nil)
|
||||
|
||||
// NewStore creates a SQLite-backed cache.Store for objects of the given example type
|
||||
func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, gvk schema.GroupVersionKind, name string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates) (*Store, error) {
|
||||
exampleType := reflect.TypeOf(example)
|
||||
if exampleType.Kind() != reflect.Ptr {
|
||||
exampleType = reflect.PointerTo(exampleType).Elem()
|
||||
}
|
||||
s := &Store{
|
||||
ctx: ctx,
|
||||
name: name,
|
||||
gvk: gvk,
|
||||
externalUpdateInfo: externalUpdateInfo,
|
||||
selfUpdateInfo: selfUpdateInfo,
|
||||
typ: reflect.TypeOf(example),
|
||||
typ: exampleType,
|
||||
Client: c,
|
||||
keyFunc: keyFunc,
|
||||
shouldEncrypt: shouldEncrypt,
|
||||
|
Reference in New Issue
Block a user