mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
1010 lines
31 KiB
Go
1010 lines
31 KiB
Go
/*
|
|
Copyright 2021 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package tracing
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
|
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
|
|
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
|
|
"google.golang.org/grpc"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
|
client "k8s.io/client-go/kubernetes"
|
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
|
"k8s.io/kubernetes/test/integration/framework"
|
|
)
|
|
|
|
func TestAPIServerTracingWithEgressSelector(t *testing.T) {
|
|
// Listen for traces from the API Server before starting it, so the
|
|
// API Server will successfully connect right away during the test.
|
|
listener, err := net.Listen("tcp", "localhost:")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// Use an egress selector which doesn't have a controlplane config to ensure
|
|
// tracing works in that context. Write the egress selector configuration to a file.
|
|
egressSelectorConfigFile, err := os.CreateTemp("", "egress_selector_configuration.yaml")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer os.Remove(egressSelectorConfigFile.Name())
|
|
|
|
if err := os.WriteFile(egressSelectorConfigFile.Name(), []byte(`
|
|
apiVersion: apiserver.config.k8s.io/v1beta1
|
|
kind: EgressSelectorConfiguration
|
|
egressSelections:
|
|
- name: cluster
|
|
connection:
|
|
proxyProtocol: Direct
|
|
transport:`), os.FileMode(0755)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Write the configuration for tracing to a file
|
|
tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer os.Remove(tracingConfigFile.Name())
|
|
|
|
if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
|
|
apiVersion: apiserver.config.k8s.io/v1beta1
|
|
kind: TracingConfiguration
|
|
endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Start the API Server with our tracing configuration
|
|
testServer := kubeapiservertesting.StartTestServerOrDie(t,
|
|
kubeapiservertesting.NewDefaultTestServerOptions(),
|
|
[]string{
|
|
"--tracing-config-file=" + tracingConfigFile.Name(),
|
|
"--egress-selector-config-file=" + egressSelectorConfigFile.Name(),
|
|
},
|
|
framework.SharedEtcd(),
|
|
)
|
|
defer testServer.TearDownFn()
|
|
clientSet, err := client.NewForConfig(testServer.ClientConfig)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// Make sure the API Server hasn't crashed.
|
|
_, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func TestAPIServerTracing(t *testing.T) {
|
|
// Listen for traces from the API Server before starting it, so the
|
|
// API Server will successfully connect right away during the test.
|
|
listener, err := net.Listen("tcp", "localhost:")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// Write the configuration for tracing to a file
|
|
tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer os.Remove(tracingConfigFile.Name())
|
|
|
|
if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
|
|
apiVersion: apiserver.config.k8s.io/v1beta1
|
|
kind: TracingConfiguration
|
|
samplingRatePerMillion: 1000000
|
|
endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
srv := grpc.NewServer()
|
|
fakeServer := &traceServer{t: t}
|
|
fakeServer.resetExpectations([]*spanExpectation{})
|
|
traceservice.RegisterTraceServiceServer(srv, fakeServer)
|
|
|
|
go srv.Serve(listener)
|
|
defer srv.Stop()
|
|
|
|
// Start the API Server with our tracing configuration
|
|
testServer := kubeapiservertesting.StartTestServerOrDie(t,
|
|
kubeapiservertesting.NewDefaultTestServerOptions(),
|
|
[]string{"--tracing-config-file=" + tracingConfigFile.Name()},
|
|
framework.SharedEtcd(),
|
|
)
|
|
defer testServer.TearDownFn()
|
|
clientSet, err := client.NewForConfig(testServer.ClientConfig)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for _, tc := range []struct {
|
|
desc string
|
|
apiCall func(*client.Clientset) error
|
|
expectedTrace []*spanExpectation
|
|
}{
|
|
{
|
|
desc: "create node",
|
|
apiCall: func(c *client.Clientset) error {
|
|
_, err = clientSet.CoreV1().Nodes().Create(context.Background(),
|
|
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, metav1.CreateOptions{})
|
|
return err
|
|
},
|
|
expectedTrace: []*spanExpectation{
|
|
{
|
|
name: "KubernetesAPI",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"http.user_agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"http.target": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes"
|
|
},
|
|
"http.method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "POST"
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "authentication",
|
|
},
|
|
{
|
|
name: "Create",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes")
|
|
},
|
|
"user-agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"client": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "127.0.0.1"
|
|
},
|
|
"accept": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
},
|
|
events: []string{
|
|
"limitedReadBody succeeded",
|
|
"About to convert to expected version",
|
|
"Conversion done",
|
|
"About to store object in database",
|
|
"Write to database call succeeded",
|
|
"About to write a response",
|
|
"Writing http response done",
|
|
},
|
|
},
|
|
{
|
|
name: "Create etcd3",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"key": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/minions/fake"
|
|
},
|
|
"type": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "*core.Node"
|
|
},
|
|
"resource": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "nodes"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to Encode",
|
|
"Encode succeeded",
|
|
"TransformToStorage succeeded",
|
|
"Txn call succeeded",
|
|
"decode succeeded",
|
|
},
|
|
},
|
|
{
|
|
name: "etcdserverpb.KV/Txn",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"rpc.system": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "grpc"
|
|
},
|
|
},
|
|
events: []string{"message"},
|
|
},
|
|
{
|
|
name: "SerializeObject",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
"method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "POST"
|
|
},
|
|
"mediaType": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
|
|
},
|
|
"encoder": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to start writing response",
|
|
"Write call succeeded",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "get node",
|
|
apiCall: func(c *client.Clientset) error {
|
|
// This depends on the "create node" step having completed successfully
|
|
_, err = clientSet.CoreV1().Nodes().Get(context.Background(), "fake", metav1.GetOptions{})
|
|
return err
|
|
},
|
|
expectedTrace: []*spanExpectation{
|
|
{
|
|
name: "KubernetesAPI",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"http.user_agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"http.target": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes/fake"
|
|
},
|
|
"http.method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "GET"
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "authentication",
|
|
},
|
|
{
|
|
name: "Get",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"user-agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"client": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "127.0.0.1"
|
|
},
|
|
"accept": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to Get from storage",
|
|
"About to write a response",
|
|
"Writing http response done",
|
|
},
|
|
},
|
|
{
|
|
name: "etcdserverpb.KV/Range",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"rpc.system": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "grpc"
|
|
},
|
|
},
|
|
events: []string{"message"},
|
|
},
|
|
{
|
|
name: "SerializeObject",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes/fake"
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
"method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "GET"
|
|
},
|
|
"mediaType": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
|
|
},
|
|
"encoder": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to start writing response",
|
|
"Write call succeeded",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "list nodes",
|
|
apiCall: func(c *client.Clientset) error {
|
|
_, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
|
|
return err
|
|
},
|
|
expectedTrace: []*spanExpectation{
|
|
{
|
|
name: "KubernetesAPI",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"http.user_agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"http.target": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes"
|
|
},
|
|
"http.method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "GET"
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "authentication",
|
|
},
|
|
{
|
|
name: "List",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes"
|
|
},
|
|
"user-agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"client": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "127.0.0.1"
|
|
},
|
|
"accept": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to List from storage",
|
|
"Listing from storage done",
|
|
"Writing http response done",
|
|
},
|
|
},
|
|
{
|
|
name: "List(recursive=true) etcd3",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"key": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/minions"
|
|
},
|
|
"resourceVersion": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == ""
|
|
},
|
|
"resourceVersionMatch": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == ""
|
|
},
|
|
"limit": func(v *commonv1.AnyValue) bool {
|
|
return v.GetIntValue() == 0
|
|
},
|
|
"continue": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == ""
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "etcdserverpb.KV/Range",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"rpc.system": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "grpc"
|
|
},
|
|
},
|
|
events: []string{"message"},
|
|
},
|
|
{
|
|
name: "SerializeObject",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes"
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
"method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "GET"
|
|
},
|
|
"mediaType": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
|
|
},
|
|
"encoder": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to start writing response",
|
|
"Write call succeeded",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "update node",
|
|
apiCall: func(c *client.Clientset) error {
|
|
// This depends on the "create node" step having completed successfully
|
|
_, err = clientSet.CoreV1().Nodes().Update(context.Background(),
|
|
&v1.Node{ObjectMeta: metav1.ObjectMeta{
|
|
Name: "fake",
|
|
Annotations: map[string]string{"foo": "bar"},
|
|
}}, metav1.UpdateOptions{})
|
|
return err
|
|
},
|
|
expectedTrace: []*spanExpectation{
|
|
{
|
|
name: "KubernetesAPI",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"http.user_agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"http.target": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes/fake"
|
|
},
|
|
"http.method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "PUT"
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "authentication",
|
|
},
|
|
{
|
|
name: "Update",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"user-agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"client": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "127.0.0.1"
|
|
},
|
|
"accept": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
},
|
|
events: []string{
|
|
"limitedReadBody succeeded",
|
|
"About to convert to expected version",
|
|
"Conversion done",
|
|
"About to store object in database",
|
|
"Write to database call succeeded",
|
|
"About to write a response",
|
|
"Writing http response done",
|
|
},
|
|
},
|
|
{
|
|
name: "GuaranteedUpdate etcd3",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"key": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/minions/fake"
|
|
},
|
|
"type": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "*core.Node"
|
|
},
|
|
"resource": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "nodes"
|
|
},
|
|
},
|
|
events: []string{
|
|
"initial value restored",
|
|
"About to Encode",
|
|
"Encode succeeded",
|
|
"TransformToStorage succeeded",
|
|
"Transaction prepared",
|
|
"Txn call completed",
|
|
"Transaction committed",
|
|
"decode succeeded",
|
|
},
|
|
},
|
|
{
|
|
name: "etcdserverpb.KV/Txn",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"rpc.system": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "grpc"
|
|
},
|
|
},
|
|
events: []string{"message"},
|
|
},
|
|
{
|
|
name: "SerializeObject",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
"method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "PUT"
|
|
},
|
|
"mediaType": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
|
|
},
|
|
"encoder": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to start writing response",
|
|
"Write call succeeded",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "patch node",
|
|
apiCall: func(c *client.Clientset) error {
|
|
// This depends on the "create node" step having completed successfully
|
|
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
|
|
Name: "fake",
|
|
Annotations: map[string]string{"foo": "bar"},
|
|
}}
|
|
oldData, err := json.Marshal(oldNode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
|
|
Name: "fake",
|
|
Annotations: map[string]string{"foo": "bar"},
|
|
Labels: map[string]string{"hello": "world"},
|
|
}}
|
|
newData, err := json.Marshal(newNode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = clientSet.CoreV1().Nodes().Patch(context.Background(), "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
|
|
return err
|
|
},
|
|
expectedTrace: []*spanExpectation{
|
|
{
|
|
name: "KubernetesAPI",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"http.user_agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"http.target": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes/fake"
|
|
},
|
|
"http.method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "PATCH"
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "authentication",
|
|
},
|
|
{
|
|
name: "Patch",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"user-agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"client": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "127.0.0.1"
|
|
},
|
|
"accept": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
},
|
|
events: []string{
|
|
"limitedReadBody succeeded",
|
|
"Recorded the audit event",
|
|
"About to apply patch",
|
|
"About to check admission control",
|
|
"Object stored in database",
|
|
"About to write a response",
|
|
"Writing http response done",
|
|
},
|
|
},
|
|
{
|
|
name: "GuaranteedUpdate etcd3",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"key": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/minions/fake"
|
|
},
|
|
"type": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "*core.Node"
|
|
},
|
|
"resource": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "nodes"
|
|
},
|
|
},
|
|
events: []string{
|
|
"initial value restored",
|
|
"About to Encode",
|
|
"Encode succeeded",
|
|
"TransformToStorage succeeded",
|
|
"Transaction prepared",
|
|
"Txn call completed",
|
|
"Transaction committed",
|
|
"decode succeeded",
|
|
},
|
|
},
|
|
{
|
|
name: "etcdserverpb.KV/Txn",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"rpc.system": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "grpc"
|
|
},
|
|
},
|
|
events: []string{"message"},
|
|
},
|
|
{
|
|
name: "SerializeObject",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
"method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "PATCH"
|
|
},
|
|
"mediaType": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
|
|
},
|
|
"encoder": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to start writing response",
|
|
"Write call succeeded",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "delete node",
|
|
apiCall: func(c *client.Clientset) error {
|
|
// This depends on the "create node" step having completed successfully
|
|
return clientSet.CoreV1().Nodes().Delete(context.Background(), "fake", metav1.DeleteOptions{})
|
|
},
|
|
expectedTrace: []*spanExpectation{
|
|
{
|
|
name: "KubernetesAPI",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"http.user_agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"http.target": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "/api/v1/nodes/fake"
|
|
},
|
|
"http.method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "DELETE"
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "authentication",
|
|
},
|
|
{
|
|
name: "Delete",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"user-agent": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasPrefix(v.GetStringValue(), "tracing.test")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"client": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "127.0.0.1"
|
|
},
|
|
"accept": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
},
|
|
events: []string{
|
|
"limitedReadBody succeeded",
|
|
"Decoded delete options",
|
|
"Recorded the audit event",
|
|
"About to delete object from database",
|
|
"Object deleted from database",
|
|
"About to write a response",
|
|
"Writing http response done",
|
|
},
|
|
},
|
|
{
|
|
name: "etcdserverpb.KV/Txn",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"rpc.system": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "grpc"
|
|
},
|
|
},
|
|
events: []string{"message"},
|
|
},
|
|
{
|
|
name: "SerializeObject",
|
|
attributes: map[string]func(*commonv1.AnyValue) bool{
|
|
"url": func(v *commonv1.AnyValue) bool {
|
|
return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
|
|
},
|
|
"audit-id": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() != ""
|
|
},
|
|
"protocol": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "HTTP/2.0"
|
|
},
|
|
"method": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "DELETE"
|
|
},
|
|
"mediaType": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
|
|
},
|
|
"encoder": func(v *commonv1.AnyValue) bool {
|
|
return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
|
|
},
|
|
},
|
|
events: []string{
|
|
"About to start writing response",
|
|
"Write call succeeded",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
} {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
fakeServer.resetExpectations(tc.expectedTrace)
|
|
|
|
// Make our call to the API server
|
|
if err := tc.apiCall(clientSet); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Wait for a span to be recorded from our request
|
|
select {
|
|
case <-fakeServer.traceFound:
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatal("Timed out waiting for trace")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// traceServer implements TracesServiceServer, which can receive spans from the
|
|
// API Server via OTLP.
|
|
type traceServer struct {
|
|
t *testing.T
|
|
traceservice.UnimplementedTraceServiceServer
|
|
// the lock guards the per-scenario state below
|
|
lock sync.Mutex
|
|
traceFound chan struct{}
|
|
expectations traceExpectation
|
|
}
|
|
|
|
func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.expectations.update(req)
|
|
// if all expectations are met, notify the test scenario by closing traceFound
|
|
if t.expectations.met() {
|
|
select {
|
|
case <-t.traceFound:
|
|
// traceFound is already closed
|
|
default:
|
|
close(t.traceFound)
|
|
}
|
|
}
|
|
return &traceservice.ExportTraceServiceResponse{}, nil
|
|
}
|
|
|
|
// resetExpectations is used by a new test scenario to set new expectations for
|
|
// the test server.
|
|
func (t *traceServer) resetExpectations(newExpectations traceExpectation) {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
t.traceFound = make(chan struct{})
|
|
t.expectations = newExpectations
|
|
}
|
|
|
|
// traceExpectation is an expectation for an entire trace
|
|
type traceExpectation []*spanExpectation
|
|
|
|
// met returns true if all span expectations the server is looking for have
|
|
// been satisfied.
|
|
func (t traceExpectation) met() bool {
|
|
if len(t) == 0 {
|
|
return true
|
|
}
|
|
// we want to find any trace ID which all span IDs contain.
|
|
// try each trace ID met by the first span.
|
|
possibleTraceIDs := t[0].metTraceIDs
|
|
for _, tid := range possibleTraceIDs {
|
|
if t.contains(tid) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// contains returns true if the all spans in the trace expectation contain the
|
|
// trace ID
|
|
func (t traceExpectation) contains(checkTID string) bool {
|
|
for _, expectation := range t {
|
|
if !expectation.contains(checkTID) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// update finds all expectations that are met by a span in the
|
|
// incoming request.
|
|
func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest) {
|
|
for _, resourceSpans := range req.GetResourceSpans() {
|
|
for _, instrumentationSpans := range resourceSpans.GetScopeSpans() {
|
|
for _, span := range instrumentationSpans.GetSpans() {
|
|
t.updateForSpan(span)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateForSpan updates expectations based on a single incoming span.
|
|
func (t traceExpectation) updateForSpan(span *tracev1.Span) {
|
|
for i, spanExpectation := range t {
|
|
if span.Name != spanExpectation.name {
|
|
continue
|
|
}
|
|
if !spanExpectation.attributes.matches(span.GetAttributes()) {
|
|
continue
|
|
}
|
|
if !spanExpectation.events.matches(span.GetEvents()) {
|
|
continue
|
|
}
|
|
t[i].metTraceIDs = append(spanExpectation.metTraceIDs, hex.EncodeToString(span.TraceId[:]))
|
|
}
|
|
|
|
}
|
|
|
|
// spanExpectation is the expectation for a single span
|
|
type spanExpectation struct {
|
|
name string
|
|
attributes attributeExpectation
|
|
events eventExpectation
|
|
// For each trace ID that meets this expectation, record it here.
|
|
// This way, we can ensure that all spans that should be in the same trace have the same trace ID
|
|
metTraceIDs []string
|
|
}
|
|
|
|
func (s *spanExpectation) contains(tid string) bool {
|
|
for _, metTID := range s.metTraceIDs {
|
|
if tid == metTID {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// eventExpectation is the expectation for an event attached to a span.
|
|
// It is comprised of event names.
|
|
type eventExpectation []string
|
|
|
|
// matches returns true if all expected events exist in the list of input events.
|
|
func (e eventExpectation) matches(events []*tracev1.Span_Event) bool {
|
|
eventMap := map[string]struct{}{}
|
|
for _, event := range events {
|
|
eventMap[event.Name] = struct{}{}
|
|
}
|
|
for _, wantEvent := range e {
|
|
if _, ok := eventMap[wantEvent]; !ok {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// eventExpectation is the expectation for an event attached to a span.
|
|
// It is a map from attribute key, to a value-matching function.
|
|
type attributeExpectation map[string]func(*commonv1.AnyValue) bool
|
|
|
|
// matches returns true if all expected attributes exist in the intput list of attributes.
|
|
func (a attributeExpectation) matches(attrs []*commonv1.KeyValue) bool {
|
|
attrsMap := map[string]*commonv1.AnyValue{}
|
|
for _, attr := range attrs {
|
|
attrsMap[attr.GetKey()] = attr.GetValue()
|
|
}
|
|
for key, checkVal := range a {
|
|
if val, ok := attrsMap[key]; !ok || !checkVal(val) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|