diff --git a/pkg/server/server.go b/pkg/server/server.go index 7a1570ec..5552af69 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -213,7 +213,7 @@ func setup(ctx context.Context, server *Server) error { if server.SQLCache { sqlStore, err := sqlproxy.NewProxyStore(ctx, cols, cf, summaryCache, summaryCache, server.cacheFactory, false) if err != nil { - panic(err) + return err } errStore := proxy.NewErrorStore( diff --git a/pkg/sqlcache/db/encoding.go b/pkg/sqlcache/db/encoding.go new file mode 100644 index 00000000..101450a2 --- /dev/null +++ b/pkg/sqlcache/db/encoding.go @@ -0,0 +1,169 @@ +package db + +import ( + "bytes" + "compress/gzip" + "encoding/gob" + "encoding/json" + "fmt" + "io" + "reflect" + "strings" + "sync" +) + +func init() { + // necessary in order to gob/ungob unstructured.Unstructured objects + gob.Register(map[string]any{}) + gob.Register([]any{}) +} + +type encoding interface { + // Encode serializes an object into the provided writer + Encode(io.Writer, any) error + // Decode reads from a provided reader and deserializes into an object + Decode(io.Reader, any) error +} + +type gobEncoding struct { + writeLock, readLock sync.Mutex + writeBuf, readBuf bytes.Buffer + encoder *gob.Encoder + decoder *gob.Decoder + seenTypes map[reflect.Type]struct{} +} + +func (g *gobEncoding) Encode(w io.Writer, obj any) error { + g.writeLock.Lock() + defer g.writeLock.Unlock() + + if g.encoder == nil { + g.encoder = gob.NewEncoder(&g.writeBuf) + } + + g.writeBuf.Reset() + if err := g.encoder.Encode(obj); err != nil { + return err + } + + if err := g.registerTypeIfNeeded(obj); err != nil { + return err + } + + _, err := g.writeBuf.WriteTo(w) + return err +} + +// registerTypeIfNeeded prevents future decoding errors by running Decode right after the first Encode for an object type +// This is needed when reusing a gob.Encoder, as it assumes the receiving end of those messages is always the same gob.Decoder. +// Due to this assumption, it applies some optimizations, like avoiding sending complete type's information (needed for decoding) if it already did it before. +// This means the first object for each type encoded by a gob.Encoder will have a bigger payload, but more importantly, +// the decoding will fail if this is not the first object being decoded later. This function forces that to prevent this from happening. +func (g *gobEncoding) registerTypeIfNeeded(obj any) error { + if g.seenTypes == nil { + g.seenTypes = make(map[reflect.Type]struct{}) + } + + typ := reflect.TypeOf(obj) + if _, ok := g.seenTypes[typ]; !ok { + g.seenTypes[typ] = struct{}{} + // Consume the current write buffer and re-generate it (will produce a smaller version) + newObj := reflect.New(typ).Interface() + if err := g.Decode(bytes.NewReader(g.writeBuf.Bytes()), newObj); err != nil { + return fmt.Errorf("could not decode %T: %w", obj, err) + } + + g.writeBuf.Reset() + return g.encoder.Encode(obj) + } + return nil +} + +func (g *gobEncoding) Decode(r io.Reader, into any) error { + g.readLock.Lock() + defer g.readLock.Unlock() + + if g.decoder == nil { + g.decoder = gob.NewDecoder(&g.readBuf) + } + + g.readBuf.Reset() + if _, err := g.readBuf.ReadFrom(r); err != nil { + return err + } + return g.decoder.Decode(into) +} + +type jsonEncoding struct { + indentLevel int +} + +func (j jsonEncoding) Encode(w io.Writer, obj any) error { + enc := json.NewEncoder(w) + if j.indentLevel > 0 { + enc.SetIndent("", strings.Repeat(" ", j.indentLevel)) + } + + if err := enc.Encode(obj); err != nil { + return err + } + return nil +} + +func (j jsonEncoding) Decode(r io.Reader, into any) error { + return json.NewDecoder(r).Decode(into) +} + +type gzipEncoding struct { + encoding + writers sync.Pool + readers sync.Pool +} + +func gzipped(wrapped encoding) *gzipEncoding { + gz := gzipEncoding{ + encoding: wrapped, + } + return &gz +} + +func (gz *gzipEncoding) Encode(w io.Writer, obj any) error { + gzw, ok := gz.writers.Get().(*gzip.Writer) + if !ok { + gzw = gzip.NewWriter(io.Discard) + } + gzw.Reset(w) + defer func() { + gzw.Reset(nil) + gz.writers.Put(gzw) + }() + + if err := gz.encoding.Encode(gzw, obj); err != nil { + return err + } + return gzw.Close() +} + +func (gz *gzipEncoding) Decode(r io.Reader, into any) error { + gzr, ok := gz.readers.Get().(*gzip.Reader) + if ok { + if err := gzr.Reset(r); err != nil { + return err + } + } else { + var err error + gzr, err = gzip.NewReader(r) + if err != nil { + return err + } + } + defer func() { + gzr.Close() + gz.readers.Put(gzr) + }() + + if err := gz.encoding.Decode(gzr, into); err != nil { + return err + } + return gzr.Close() +} diff --git a/pkg/sqlcache/db/encoding_test.go b/pkg/sqlcache/db/encoding_test.go new file mode 100644 index 00000000..cefdc7e5 --- /dev/null +++ b/pkg/sqlcache/db/encoding_test.go @@ -0,0 +1,163 @@ +package db + +import ( + "bytes" + "io" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var random = rand.New(rand.NewSource(0)) + +func TestEquality(t *testing.T) { + testObject := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + Annotations: map[string]string{"annotation": "test"}, + Labels: map[string]string{"label": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "test", + Image: "testimage", + VolumeMounts: []corev1.VolumeMount{{ + Name: "test", + MountPath: "/test", + }}, + }}, + Volumes: []corev1.Volume{{ + Name: "test", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }}, + }, + } + tests := []struct { + name string + encoding encoding + }{ + {name: "gob", encoding: &gobEncoding{}}, + {name: "json", encoding: &jsonEncoding{}}, + {name: "gob+gz", encoding: gzipped(&gobEncoding{})}, + {name: "json+gz", encoding: gzipped(&jsonEncoding{})}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + if err := tt.encoding.Encode(&buf, testObject); err != nil { + t.Fatal(err) + } + var dest *corev1.Pod + if err := tt.encoding.Decode(bytes.NewReader(buf.Bytes()), &dest); err != nil { + t.Fatal(err) + } + assert.Equal(t, testObject, dest) + }) + } +} + +func BenchmarkEncodings(b *testing.B) { + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + }, + // a single entry map with 10K zero characters should account for ~10KB ConfigMap size, + Data: generateTestData(10000), + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + Annotations: map[string]string{"annotation": "test"}, + Labels: map[string]string{"label": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "test", + Image: "testimage", + VolumeMounts: []corev1.VolumeMount{{ + Name: "test", + MountPath: "/test", + }}, + }}, + Volumes: []corev1.Volume{{ + Name: "test", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }}, + }, + } + + tests := []struct { + name string + encoding encoding + testObject any + }{ + {name: "gob", encoding: &gobEncoding{}}, + {name: "json", encoding: &jsonEncoding{}}, + {name: "gob+gzip", encoding: gzipped(&gobEncoding{})}, + {name: "json+gzip", encoding: gzipped(&jsonEncoding{})}, + } + for _, tt := range tests { + for objType, testObject := range map[string]any{ + "10KB-configmap": cm, + "pod": pod, + } { + b.Run(tt.name+"-"+objType, func(b *testing.B) { + b.Run("encoding", func(b *testing.B) { + for b.Loop() { + w := new(discardWriter) + if err := tt.encoding.Encode(w, testObject); err != nil { + b.Error(err) + } + b.ReportMetric(float64(w.count), "bytes") + } + }) + + var buf bytes.Buffer + if err := tt.encoding.Encode(&buf, testObject); err != nil { + b.Fatal(err) + } + serialized := buf.Bytes() + b.Run("decoding", func(b *testing.B) { + var dest corev1.ConfigMap + for b.Loop() { + if err := tt.encoding.Decode(bytes.NewReader(serialized), &dest); err != nil { + b.Fatal(err) + } + } + }) + }) + } + } +} + +type discardWriter struct { + count int +} + +func (d *discardWriter) Write(p []byte) (int, error) { + n, err := io.Discard.Write(p) + d.count += n + return n, err +} + +// single-entry map, whose value is a randomly-generated string of n size +func generateTestData(n int) map[string]string { + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + b := make([]byte, n) + for i := range b { + b[i] = letters[random.Int63()%int64(len(letters))] + } + return map[string]string{ + "data": string(b), + } +} diff --git a/pkg/sqlcache/store/store.go b/pkg/sqlcache/store/store.go index 3c73a477..8c6a01f5 100644 --- a/pkg/sqlcache/store/store.go +++ b/pkg/sqlcache/store/store.go @@ -79,13 +79,17 @@ var _ cache.Store = (*Store)(nil) // NewStore creates a SQLite-backed cache.Store for objects of the given example type func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, gvk schema.GroupVersionKind, name string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates) (*Store, error) { + exampleType := reflect.TypeOf(example) + if exampleType.Kind() != reflect.Ptr { + exampleType = reflect.PointerTo(exampleType).Elem() + } s := &Store{ ctx: ctx, name: name, gvk: gvk, externalUpdateInfo: externalUpdateInfo, selfUpdateInfo: selfUpdateInfo, - typ: reflect.TypeOf(example), + typ: exampleType, Client: c, keyFunc: keyFunc, shouldEncrypt: shouldEncrypt,