mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
test/e2e/instrumentation/logging/utils: remove dead package
The code is not imported anywhere in k/k and therefore should be removed.
This commit is contained in:
parent
f14ebac384
commit
f314c6b831
@ -1,25 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package utils
|
|
||||||
|
|
||||||
// LogProvider interface provides an API to get logs from the logging backend.
|
|
||||||
type LogProvider interface {
|
|
||||||
Init() error
|
|
||||||
Cleanup()
|
|
||||||
ReadEntries(name string) []LogEntry
|
|
||||||
LoggingAgentName() string
|
|
||||||
}
|
|
@ -1,96 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
|
||||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
|
||||||
"k8s.io/utils/integer"
|
|
||||||
)
|
|
||||||
|
|
||||||
// EnsureLoggingAgentDeployment checks that logging agent is present on each
|
|
||||||
// node and returns an error if that's not true.
|
|
||||||
func EnsureLoggingAgentDeployment(f *framework.Framework, appName string) error {
|
|
||||||
agentPods, err := getLoggingAgentPods(f, appName)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get logging agent pods: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
agentPerNode := make(map[string]int)
|
|
||||||
for _, pod := range agentPods.Items {
|
|
||||||
agentPerNode[pod.Spec.NodeName]++
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeList, err := e2enode.GetReadySchedulableNodes(f.ClientSet)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get nodes: %v", err)
|
|
||||||
}
|
|
||||||
for _, node := range nodeList.Items {
|
|
||||||
agentPodsCount, ok := agentPerNode[node.Name]
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("node %s doesn't have logging agents, want 1", node.Name)
|
|
||||||
} else if agentPodsCount != 1 {
|
|
||||||
return fmt.Errorf("node %s has %d logging agents, want 1", node.Name, agentPodsCount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EnsureLoggingAgentRestartsCount checks that each logging agent was restarted
|
|
||||||
// no more than maxRestarts times and returns an error if there's a pod which
|
|
||||||
// exceeds this number of restarts.
|
|
||||||
func EnsureLoggingAgentRestartsCount(f *framework.Framework, appName string, maxRestarts int) error {
|
|
||||||
agentPods, err := getLoggingAgentPods(f, appName)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get logging agent pods: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
maxRestartCount := 0
|
|
||||||
for _, pod := range agentPods.Items {
|
|
||||||
contStatuses := pod.Status.ContainerStatuses
|
|
||||||
if len(contStatuses) == 0 {
|
|
||||||
framework.Logf("There are no container statuses for pod %s", pod.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
restartCount := int(contStatuses[0].RestartCount)
|
|
||||||
maxRestartCount = integer.IntMax(maxRestartCount, restartCount)
|
|
||||||
|
|
||||||
framework.Logf("Logging agent %s on node %s was restarted %d times",
|
|
||||||
pod.Name, pod.Spec.NodeName, restartCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxRestartCount > maxRestarts {
|
|
||||||
return fmt.Errorf("max logging agent restarts was %d, which is more than allowed %d",
|
|
||||||
maxRestartCount, maxRestarts)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLoggingAgentPods(f *framework.Framework, appName string) (*v1.PodList, error) {
|
|
||||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": appName}))
|
|
||||||
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
|
||||||
return f.ClientSet.CoreV1().Pods(api.NamespaceSystem).List(context.TODO(), options)
|
|
||||||
}
|
|
@ -1,189 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
|
||||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
|
||||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
|
||||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Amount of requested cores for logging container in millicores
|
|
||||||
loggingContainerCPURequest = 10
|
|
||||||
|
|
||||||
// Amount of requested memory for logging container in bytes
|
|
||||||
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
|
||||||
|
|
||||||
// Name of the container used for logging tests
|
|
||||||
loggingContainerName = "logging-container"
|
|
||||||
)
|
|
||||||
|
|
||||||
// LoggingPod is an interface of a pod that can be started and that logs
|
|
||||||
// something to its stdout, possibly indefinitely.
|
|
||||||
type LoggingPod interface {
|
|
||||||
// Name equals to the Kubernetes pod name.
|
|
||||||
Name() string
|
|
||||||
|
|
||||||
// Start method controls when the logging pod is started in the cluster.
|
|
||||||
Start(f *framework.Framework) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartAndReturnSelf is a helper method to start a logging pod and
|
|
||||||
// immediately return it.
|
|
||||||
func StartAndReturnSelf(p LoggingPod, f *framework.Framework) (LoggingPod, error) {
|
|
||||||
err := p.Start(f)
|
|
||||||
return p, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// FiniteLoggingPod is a logging pod that emits a known number of log lines.
|
|
||||||
type FiniteLoggingPod interface {
|
|
||||||
LoggingPod
|
|
||||||
|
|
||||||
// ExpectedLinesNumber returns the number of lines that are
|
|
||||||
// expected to be ingested from this pod.
|
|
||||||
ExpectedLineCount() int
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ FiniteLoggingPod = &loadLoggingPod{}
|
|
||||||
|
|
||||||
type loadLoggingPod struct {
|
|
||||||
name string
|
|
||||||
nodeName string
|
|
||||||
expectedLinesCount int
|
|
||||||
runDuration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLoadLoggingPod returns a logging pod that generates totalLines random
|
|
||||||
// lines over period of length loggingDuration. Lines generated by this
|
|
||||||
// pod are numbered and have well-defined structure.
|
|
||||||
func NewLoadLoggingPod(podName string, nodeName string, totalLines int,
|
|
||||||
loggingDuration time.Duration) FiniteLoggingPod {
|
|
||||||
return &loadLoggingPod{
|
|
||||||
name: podName,
|
|
||||||
nodeName: nodeName,
|
|
||||||
expectedLinesCount: totalLines,
|
|
||||||
runDuration: loggingDuration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *loadLoggingPod) Name() string {
|
|
||||||
return p.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *loadLoggingPod) Start(f *framework.Framework) error {
|
|
||||||
framework.Logf("Starting load logging pod %s", p.name)
|
|
||||||
e2epod.NewPodClient(f).Create(&v1.Pod{
|
|
||||||
ObjectMeta: meta_v1.ObjectMeta{
|
|
||||||
Name: p.name,
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyNever,
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: loggingContainerName,
|
|
||||||
Image: imageutils.GetE2EImage(imageutils.Agnhost),
|
|
||||||
Args: []string{"logs-generator", "--log-lines-total", strconv.Itoa(p.expectedLinesCount), "--run-duration", p.runDuration.String()},
|
|
||||||
Resources: v1.ResourceRequirements{
|
|
||||||
Requests: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: *resource.NewMilliQuantity(
|
|
||||||
loggingContainerCPURequest,
|
|
||||||
resource.DecimalSI),
|
|
||||||
v1.ResourceMemory: *resource.NewQuantity(
|
|
||||||
loggingContainerMemoryRequest,
|
|
||||||
resource.BinarySI),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
NodeName: p.nodeName,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return e2epod.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *loadLoggingPod) ExpectedLineCount() int {
|
|
||||||
return p.expectedLinesCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRepeatingLoggingPod returns a logging pod that each second prints
|
|
||||||
// line value to its stdout.
|
|
||||||
func NewRepeatingLoggingPod(podName string, line string) LoggingPod {
|
|
||||||
cmd := []string{
|
|
||||||
"/bin/sh",
|
|
||||||
"-c",
|
|
||||||
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", line),
|
|
||||||
}
|
|
||||||
return NewExecLoggingPod(podName, cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ LoggingPod = &execLoggingPod{}
|
|
||||||
|
|
||||||
type execLoggingPod struct {
|
|
||||||
name string
|
|
||||||
cmd []string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewExecLoggingPod returns a logging pod that produces logs through
|
|
||||||
// executing a command, passed in cmd.
|
|
||||||
func NewExecLoggingPod(podName string, cmd []string) LoggingPod {
|
|
||||||
return &execLoggingPod{
|
|
||||||
name: podName,
|
|
||||||
cmd: cmd,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *execLoggingPod) Name() string {
|
|
||||||
return p.name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *execLoggingPod) Start(f *framework.Framework) error {
|
|
||||||
framework.Logf("Starting repeating logging pod %s", p.name)
|
|
||||||
e2epod.NewPodClient(f).Create(&v1.Pod{
|
|
||||||
ObjectMeta: meta_v1.ObjectMeta{
|
|
||||||
Name: p.name,
|
|
||||||
},
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
Containers: []v1.Container{
|
|
||||||
{
|
|
||||||
Name: loggingContainerName,
|
|
||||||
Image: imageutils.GetE2EImage(imageutils.BusyBox),
|
|
||||||
Command: p.cmd,
|
|
||||||
Resources: v1.ResourceRequirements{
|
|
||||||
Requests: v1.ResourceList{
|
|
||||||
v1.ResourceCPU: *resource.NewMilliQuantity(
|
|
||||||
loggingContainerCPURequest,
|
|
||||||
resource.DecimalSI),
|
|
||||||
v1.ResourceMemory: *resource.NewQuantity(
|
|
||||||
loggingContainerMemoryRequest,
|
|
||||||
resource.BinarySI),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return e2epod.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name)
|
|
||||||
}
|
|
@ -1,34 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
|
||||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
|
||||||
)
|
|
||||||
|
|
||||||
// GetNodeIds returns the list of node names and panics in case of failure.
|
|
||||||
func GetNodeIds(cs clientset.Interface) []string {
|
|
||||||
nodes, err := e2enode.GetReadySchedulableNodes(cs)
|
|
||||||
framework.ExpectNoError(err)
|
|
||||||
nodeIds := []string{}
|
|
||||||
for _, n := range nodes.Items {
|
|
||||||
nodeIds = append(nodeIds, n.Name)
|
|
||||||
}
|
|
||||||
return nodeIds
|
|
||||||
}
|
|
@ -1,108 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// Regexp, matching the contents of log entries, parsed or not
|
|
||||||
logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ {7}\\d+ logs_generator.go:67] )?(\\d+) .*")
|
|
||||||
)
|
|
||||||
|
|
||||||
// LogEntry represents a log entry, received from the logging backend.
|
|
||||||
type LogEntry struct {
|
|
||||||
LogName string
|
|
||||||
TextPayload string
|
|
||||||
Location string
|
|
||||||
JSONPayload map[string]interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TryGetEntryNumber returns the number of the log entry in sequence, if it
|
|
||||||
// was generated by the load logging pod (requires special log format).
|
|
||||||
func (entry LogEntry) TryGetEntryNumber() (int, bool) {
|
|
||||||
submatch := logEntryMessageRegex.FindStringSubmatch(entry.TextPayload)
|
|
||||||
if submatch == nil || len(submatch) < 2 {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
|
|
||||||
lineNumber, err := strconv.Atoi(submatch[1])
|
|
||||||
return lineNumber, err == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// LogsQueueCollection is a thread-safe set of named log queues.
|
|
||||||
type LogsQueueCollection interface {
|
|
||||||
Push(name string, logs ...LogEntry)
|
|
||||||
Pop(name string) []LogEntry
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ LogsQueueCollection = &logsQueueCollection{}
|
|
||||||
|
|
||||||
type logsQueueCollection struct {
|
|
||||||
mutex *sync.Mutex
|
|
||||||
queues map[string]chan LogEntry
|
|
||||||
queueSize int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLogsQueueCollection returns a new LogsQueueCollection where each queue
|
|
||||||
// is created with a default size of queueSize.
|
|
||||||
func NewLogsQueueCollection(queueSize int) LogsQueueCollection {
|
|
||||||
return &logsQueueCollection{
|
|
||||||
mutex: &sync.Mutex{},
|
|
||||||
queues: map[string]chan LogEntry{},
|
|
||||||
queueSize: queueSize,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *logsQueueCollection) Push(name string, logs ...LogEntry) {
|
|
||||||
q := c.getQueue(name)
|
|
||||||
for _, log := range logs {
|
|
||||||
q <- log
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *logsQueueCollection) Pop(name string) []LogEntry {
|
|
||||||
q := c.getQueue(name)
|
|
||||||
var entries []LogEntry
|
|
||||||
polling_loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case entry := <-q:
|
|
||||||
entries = append(entries, entry)
|
|
||||||
default:
|
|
||||||
break polling_loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return entries
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *logsQueueCollection) getQueue(name string) chan LogEntry {
|
|
||||||
c.mutex.Lock()
|
|
||||||
defer c.mutex.Unlock()
|
|
||||||
|
|
||||||
if q, ok := c.queues[name]; ok {
|
|
||||||
return q
|
|
||||||
}
|
|
||||||
|
|
||||||
newQ := make(chan LogEntry, c.queueSize)
|
|
||||||
c.queues[name] = newQ
|
|
||||||
return newQ
|
|
||||||
}
|
|
@ -1,230 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package utils
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
|
||||||
)
|
|
||||||
|
|
||||||
// LogChecker is an interface for an entity that can check whether logging
|
|
||||||
// backend contains all wanted log entries.
|
|
||||||
type LogChecker interface {
|
|
||||||
EntriesIngested() (bool, error)
|
|
||||||
Timeout() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// IngestionPred is a type of a function that checks whether all required
|
|
||||||
// log entries were ingested.
|
|
||||||
type IngestionPred func(string, []LogEntry) (bool, error)
|
|
||||||
|
|
||||||
// UntilFirstEntry is a IngestionPred that checks that at least one entry was
|
|
||||||
// ingested.
|
|
||||||
var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) {
|
|
||||||
return len(entries) > 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UntilFirstEntryFromLog is a IngestionPred that checks that at least one
|
|
||||||
// entry from the log with a given name was ingested.
|
|
||||||
func UntilFirstEntryFromLog(log string) IngestionPred {
|
|
||||||
return func(_ string, entries []LogEntry) (bool, error) {
|
|
||||||
for _, e := range entries {
|
|
||||||
if e.LogName == log {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// UntilFirstEntryFromLocation is a IngestionPred that checks that at least one
|
|
||||||
// entry from the log with a given name was ingested.
|
|
||||||
func UntilFirstEntryFromLocation(location string) IngestionPred {
|
|
||||||
return func(_ string, entries []LogEntry) (bool, error) {
|
|
||||||
for _, e := range entries {
|
|
||||||
if e.Location == location {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TimeoutFun is a function that is called when the waiting times out.
|
|
||||||
type TimeoutFun func([]string, []bool) error
|
|
||||||
|
|
||||||
// JustTimeout returns the error with the list of names for which backend is
|
|
||||||
// still still missing logs.
|
|
||||||
var JustTimeout TimeoutFun = func(names []string, ingested []bool) error {
|
|
||||||
failedNames := []string{}
|
|
||||||
for i, name := range names {
|
|
||||||
if !ingested[i] {
|
|
||||||
failedNames = append(failedNames, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s",
|
|
||||||
strings.Join(failedNames, ","))
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ LogChecker = &logChecker{}
|
|
||||||
|
|
||||||
type logChecker struct {
|
|
||||||
provider LogProvider
|
|
||||||
names []string
|
|
||||||
ingested []bool
|
|
||||||
ingestionPred IngestionPred
|
|
||||||
timeoutFun TimeoutFun
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLogChecker constructs a LogChecker for a list of names from custom
|
|
||||||
// IngestionPred and TimeoutFun.
|
|
||||||
func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker {
|
|
||||||
return &logChecker{
|
|
||||||
provider: p,
|
|
||||||
names: names,
|
|
||||||
ingested: make([]bool, len(names)),
|
|
||||||
ingestionPred: pred,
|
|
||||||
timeoutFun: timeout,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *logChecker) EntriesIngested() (bool, error) {
|
|
||||||
allIngested := true
|
|
||||||
for i, name := range c.names {
|
|
||||||
if c.ingested[i] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
entries := c.provider.ReadEntries(name)
|
|
||||||
ingested, err := c.ingestionPred(name, entries)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if ingested {
|
|
||||||
c.ingested[i] = true
|
|
||||||
}
|
|
||||||
allIngested = allIngested && ingested
|
|
||||||
}
|
|
||||||
return allIngested, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *logChecker) Timeout() error {
|
|
||||||
return c.timeoutFun(c.names, c.ingested)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NumberedIngestionPred is a IngestionPred that takes into account sequential
|
|
||||||
// numbers of ingested entries.
|
|
||||||
type NumberedIngestionPred func(string, map[int]bool) (bool, error)
|
|
||||||
|
|
||||||
// NumberedTimeoutFun is a TimeoutFun that takes into account sequential
|
|
||||||
// numbers of ingested entries.
|
|
||||||
type NumberedTimeoutFun func([]string, map[string]map[int]bool) error
|
|
||||||
|
|
||||||
// NewNumberedLogChecker returns a log checker that works with numbered log
|
|
||||||
// entries generated by load logging pods.
|
|
||||||
func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred,
|
|
||||||
timeout NumberedTimeoutFun, names ...string) LogChecker {
|
|
||||||
occs := map[string]map[int]bool{}
|
|
||||||
return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) {
|
|
||||||
occ, ok := occs[name]
|
|
||||||
if !ok {
|
|
||||||
occ = map[int]bool{}
|
|
||||||
occs[name] = occ
|
|
||||||
}
|
|
||||||
for _, entry := range entries {
|
|
||||||
if no, ok := entry.TryGetEntryNumber(); ok {
|
|
||||||
occ[no] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return pred(name, occ)
|
|
||||||
}, func(names []string, _ []bool) error {
|
|
||||||
return timeout(names, occs)
|
|
||||||
}, names...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewFullIngestionPodLogChecker returns a log checks that works with numbered
|
|
||||||
// log entries generated by load logging pods and waits until all entries are
|
|
||||||
// ingested. If timeout is reached, fraction is lost logs up to slack is
|
|
||||||
// considered tolerable.
|
|
||||||
func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker {
|
|
||||||
podsMap := map[string]FiniteLoggingPod{}
|
|
||||||
for _, p := range pods {
|
|
||||||
podsMap[p.Name()] = p
|
|
||||||
}
|
|
||||||
return NewNumberedLogChecker(p, getFullIngestionPred(podsMap),
|
|
||||||
getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred {
|
|
||||||
return func(name string, occ map[int]bool) (bool, error) {
|
|
||||||
p := podsMap[name]
|
|
||||||
ok := len(occ) == p.ExpectedLineCount()
|
|
||||||
return ok, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun {
|
|
||||||
return func(names []string, occs map[string]map[int]bool) error {
|
|
||||||
totalGot, totalWant := 0, 0
|
|
||||||
lossMsgs := []string{}
|
|
||||||
for _, name := range names {
|
|
||||||
got := len(occs[name])
|
|
||||||
want := podsMap[name].ExpectedLineCount()
|
|
||||||
if got != want {
|
|
||||||
lossMsg := fmt.Sprintf("%s: %d lines", name, want-got)
|
|
||||||
lossMsgs = append(lossMsgs, lossMsg)
|
|
||||||
}
|
|
||||||
totalGot += got
|
|
||||||
totalWant += want
|
|
||||||
}
|
|
||||||
if len(lossMsgs) > 0 {
|
|
||||||
framework.Logf("Still missing logs from:\n%s", strings.Join(lossMsgs, "\n"))
|
|
||||||
}
|
|
||||||
lostFrac := 1 - float64(totalGot)/float64(totalWant)
|
|
||||||
if lostFrac > slack {
|
|
||||||
return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable",
|
|
||||||
lostFrac*100, slack*100)
|
|
||||||
}
|
|
||||||
framework.Logf("Missing %.2f%% of logs, which is lower than the threshold %.2f%%",
|
|
||||||
lostFrac*100, slack*100)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitForLogs checks that logs are ingested, as reported by the log checker
|
|
||||||
// until the timeout has passed. Function sleeps for interval between two
|
|
||||||
// log ingestion checks.
|
|
||||||
func WaitForLogs(c LogChecker, interval, timeout time.Duration) error {
|
|
||||||
err := wait.Poll(interval, timeout, func() (bool, error) {
|
|
||||||
return c.EntriesIngested()
|
|
||||||
})
|
|
||||||
if err == wait.ErrWaitTimeout {
|
|
||||||
return c.Timeout()
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string {
|
|
||||||
names := []string{}
|
|
||||||
for _, p := range pods {
|
|
||||||
names = append(names, p.Name())
|
|
||||||
}
|
|
||||||
return names
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user