mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #113213 from pohly/e2e-instrumentation-dead-code-removal
test/e2e/instrumentation/logging/utils: remove dead package
This commit is contained in:
commit
f3ae27f5ef
@ -1,25 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
// LogProvider interface provides an API to get logs from the logging backend.
|
||||
type LogProvider interface {
|
||||
Init() error
|
||||
Cleanup()
|
||||
ReadEntries(name string) []LogEntry
|
||||
LoggingAgentName() string
|
||||
}
|
@ -1,96 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
"k8s.io/utils/integer"
|
||||
)
|
||||
|
||||
// EnsureLoggingAgentDeployment checks that logging agent is present on each
|
||||
// node and returns an error if that's not true.
|
||||
func EnsureLoggingAgentDeployment(f *framework.Framework, appName string) error {
|
||||
agentPods, err := getLoggingAgentPods(f, appName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get logging agent pods: %v", err)
|
||||
}
|
||||
|
||||
agentPerNode := make(map[string]int)
|
||||
for _, pod := range agentPods.Items {
|
||||
agentPerNode[pod.Spec.NodeName]++
|
||||
}
|
||||
|
||||
nodeList, err := e2enode.GetReadySchedulableNodes(f.ClientSet)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get nodes: %v", err)
|
||||
}
|
||||
for _, node := range nodeList.Items {
|
||||
agentPodsCount, ok := agentPerNode[node.Name]
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("node %s doesn't have logging agents, want 1", node.Name)
|
||||
} else if agentPodsCount != 1 {
|
||||
return fmt.Errorf("node %s has %d logging agents, want 1", node.Name, agentPodsCount)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureLoggingAgentRestartsCount checks that each logging agent was restarted
|
||||
// no more than maxRestarts times and returns an error if there's a pod which
|
||||
// exceeds this number of restarts.
|
||||
func EnsureLoggingAgentRestartsCount(f *framework.Framework, appName string, maxRestarts int) error {
|
||||
agentPods, err := getLoggingAgentPods(f, appName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get logging agent pods: %v", err)
|
||||
}
|
||||
|
||||
maxRestartCount := 0
|
||||
for _, pod := range agentPods.Items {
|
||||
contStatuses := pod.Status.ContainerStatuses
|
||||
if len(contStatuses) == 0 {
|
||||
framework.Logf("There are no container statuses for pod %s", pod.Name)
|
||||
continue
|
||||
}
|
||||
restartCount := int(contStatuses[0].RestartCount)
|
||||
maxRestartCount = integer.IntMax(maxRestartCount, restartCount)
|
||||
|
||||
framework.Logf("Logging agent %s on node %s was restarted %d times",
|
||||
pod.Name, pod.Spec.NodeName, restartCount)
|
||||
}
|
||||
|
||||
if maxRestartCount > maxRestarts {
|
||||
return fmt.Errorf("max logging agent restarts was %d, which is more than allowed %d",
|
||||
maxRestartCount, maxRestarts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getLoggingAgentPods(f *framework.Framework, appName string) (*v1.PodList, error) {
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": appName}))
|
||||
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
||||
return f.ClientSet.CoreV1().Pods(api.NamespaceSystem).List(context.TODO(), options)
|
||||
}
|
@ -1,189 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
)
|
||||
|
||||
const (
|
||||
// Amount of requested cores for logging container in millicores
|
||||
loggingContainerCPURequest = 10
|
||||
|
||||
// Amount of requested memory for logging container in bytes
|
||||
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
||||
|
||||
// Name of the container used for logging tests
|
||||
loggingContainerName = "logging-container"
|
||||
)
|
||||
|
||||
// LoggingPod is an interface of a pod that can be started and that logs
|
||||
// something to its stdout, possibly indefinitely.
|
||||
type LoggingPod interface {
|
||||
// Name equals to the Kubernetes pod name.
|
||||
Name() string
|
||||
|
||||
// Start method controls when the logging pod is started in the cluster.
|
||||
Start(f *framework.Framework) error
|
||||
}
|
||||
|
||||
// StartAndReturnSelf is a helper method to start a logging pod and
|
||||
// immediately return it.
|
||||
func StartAndReturnSelf(p LoggingPod, f *framework.Framework) (LoggingPod, error) {
|
||||
err := p.Start(f)
|
||||
return p, err
|
||||
}
|
||||
|
||||
// FiniteLoggingPod is a logging pod that emits a known number of log lines.
|
||||
type FiniteLoggingPod interface {
|
||||
LoggingPod
|
||||
|
||||
// ExpectedLinesNumber returns the number of lines that are
|
||||
// expected to be ingested from this pod.
|
||||
ExpectedLineCount() int
|
||||
}
|
||||
|
||||
var _ FiniteLoggingPod = &loadLoggingPod{}
|
||||
|
||||
type loadLoggingPod struct {
|
||||
name string
|
||||
nodeName string
|
||||
expectedLinesCount int
|
||||
runDuration time.Duration
|
||||
}
|
||||
|
||||
// NewLoadLoggingPod returns a logging pod that generates totalLines random
|
||||
// lines over period of length loggingDuration. Lines generated by this
|
||||
// pod are numbered and have well-defined structure.
|
||||
func NewLoadLoggingPod(podName string, nodeName string, totalLines int,
|
||||
loggingDuration time.Duration) FiniteLoggingPod {
|
||||
return &loadLoggingPod{
|
||||
name: podName,
|
||||
nodeName: nodeName,
|
||||
expectedLinesCount: totalLines,
|
||||
runDuration: loggingDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *loadLoggingPod) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *loadLoggingPod) Start(f *framework.Framework) error {
|
||||
framework.Logf("Starting load logging pod %s", p.name)
|
||||
e2epod.NewPodClient(f).Create(&v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: p.name,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: loggingContainerName,
|
||||
Image: imageutils.GetE2EImage(imageutils.Agnhost),
|
||||
Args: []string{"logs-generator", "--log-lines-total", strconv.Itoa(p.expectedLinesCount), "--run-duration", p.runDuration.String()},
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(
|
||||
loggingContainerCPURequest,
|
||||
resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(
|
||||
loggingContainerMemoryRequest,
|
||||
resource.BinarySI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeName: p.nodeName,
|
||||
},
|
||||
})
|
||||
return e2epod.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name)
|
||||
}
|
||||
|
||||
func (p *loadLoggingPod) ExpectedLineCount() int {
|
||||
return p.expectedLinesCount
|
||||
}
|
||||
|
||||
// NewRepeatingLoggingPod returns a logging pod that each second prints
|
||||
// line value to its stdout.
|
||||
func NewRepeatingLoggingPod(podName string, line string) LoggingPod {
|
||||
cmd := []string{
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", line),
|
||||
}
|
||||
return NewExecLoggingPod(podName, cmd)
|
||||
}
|
||||
|
||||
var _ LoggingPod = &execLoggingPod{}
|
||||
|
||||
type execLoggingPod struct {
|
||||
name string
|
||||
cmd []string
|
||||
}
|
||||
|
||||
// NewExecLoggingPod returns a logging pod that produces logs through
|
||||
// executing a command, passed in cmd.
|
||||
func NewExecLoggingPod(podName string, cmd []string) LoggingPod {
|
||||
return &execLoggingPod{
|
||||
name: podName,
|
||||
cmd: cmd,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *execLoggingPod) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *execLoggingPod) Start(f *framework.Framework) error {
|
||||
framework.Logf("Starting repeating logging pod %s", p.name)
|
||||
e2epod.NewPodClient(f).Create(&v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: p.name,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: loggingContainerName,
|
||||
Image: imageutils.GetE2EImage(imageutils.BusyBox),
|
||||
Command: p.cmd,
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(
|
||||
loggingContainerCPURequest,
|
||||
resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(
|
||||
loggingContainerMemoryRequest,
|
||||
resource.BinarySI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
return e2epod.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name)
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
)
|
||||
|
||||
// GetNodeIds returns the list of node names and panics in case of failure.
|
||||
func GetNodeIds(cs clientset.Interface) []string {
|
||||
nodes, err := e2enode.GetReadySchedulableNodes(cs)
|
||||
framework.ExpectNoError(err)
|
||||
nodeIds := []string{}
|
||||
for _, n := range nodes.Items {
|
||||
nodeIds = append(nodeIds, n.Name)
|
||||
}
|
||||
return nodeIds
|
||||
}
|
@ -1,108 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
// Regexp, matching the contents of log entries, parsed or not
|
||||
logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ {7}\\d+ logs_generator.go:67] )?(\\d+) .*")
|
||||
)
|
||||
|
||||
// LogEntry represents a log entry, received from the logging backend.
|
||||
type LogEntry struct {
|
||||
LogName string
|
||||
TextPayload string
|
||||
Location string
|
||||
JSONPayload map[string]interface{}
|
||||
}
|
||||
|
||||
// TryGetEntryNumber returns the number of the log entry in sequence, if it
|
||||
// was generated by the load logging pod (requires special log format).
|
||||
func (entry LogEntry) TryGetEntryNumber() (int, bool) {
|
||||
submatch := logEntryMessageRegex.FindStringSubmatch(entry.TextPayload)
|
||||
if submatch == nil || len(submatch) < 2 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
lineNumber, err := strconv.Atoi(submatch[1])
|
||||
return lineNumber, err == nil
|
||||
}
|
||||
|
||||
// LogsQueueCollection is a thread-safe set of named log queues.
|
||||
type LogsQueueCollection interface {
|
||||
Push(name string, logs ...LogEntry)
|
||||
Pop(name string) []LogEntry
|
||||
}
|
||||
|
||||
var _ LogsQueueCollection = &logsQueueCollection{}
|
||||
|
||||
type logsQueueCollection struct {
|
||||
mutex *sync.Mutex
|
||||
queues map[string]chan LogEntry
|
||||
queueSize int
|
||||
}
|
||||
|
||||
// NewLogsQueueCollection returns a new LogsQueueCollection where each queue
|
||||
// is created with a default size of queueSize.
|
||||
func NewLogsQueueCollection(queueSize int) LogsQueueCollection {
|
||||
return &logsQueueCollection{
|
||||
mutex: &sync.Mutex{},
|
||||
queues: map[string]chan LogEntry{},
|
||||
queueSize: queueSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *logsQueueCollection) Push(name string, logs ...LogEntry) {
|
||||
q := c.getQueue(name)
|
||||
for _, log := range logs {
|
||||
q <- log
|
||||
}
|
||||
}
|
||||
|
||||
func (c *logsQueueCollection) Pop(name string) []LogEntry {
|
||||
q := c.getQueue(name)
|
||||
var entries []LogEntry
|
||||
polling_loop:
|
||||
for {
|
||||
select {
|
||||
case entry := <-q:
|
||||
entries = append(entries, entry)
|
||||
default:
|
||||
break polling_loop
|
||||
}
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func (c *logsQueueCollection) getQueue(name string) chan LogEntry {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if q, ok := c.queues[name]; ok {
|
||||
return q
|
||||
}
|
||||
|
||||
newQ := make(chan LogEntry, c.queueSize)
|
||||
c.queues[name] = newQ
|
||||
return newQ
|
||||
}
|
@ -1,230 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
// LogChecker is an interface for an entity that can check whether logging
|
||||
// backend contains all wanted log entries.
|
||||
type LogChecker interface {
|
||||
EntriesIngested() (bool, error)
|
||||
Timeout() error
|
||||
}
|
||||
|
||||
// IngestionPred is a type of a function that checks whether all required
|
||||
// log entries were ingested.
|
||||
type IngestionPred func(string, []LogEntry) (bool, error)
|
||||
|
||||
// UntilFirstEntry is a IngestionPred that checks that at least one entry was
|
||||
// ingested.
|
||||
var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) {
|
||||
return len(entries) > 0, nil
|
||||
}
|
||||
|
||||
// UntilFirstEntryFromLog is a IngestionPred that checks that at least one
|
||||
// entry from the log with a given name was ingested.
|
||||
func UntilFirstEntryFromLog(log string) IngestionPred {
|
||||
return func(_ string, entries []LogEntry) (bool, error) {
|
||||
for _, e := range entries {
|
||||
if e.LogName == log {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// UntilFirstEntryFromLocation is a IngestionPred that checks that at least one
|
||||
// entry from the log with a given name was ingested.
|
||||
func UntilFirstEntryFromLocation(location string) IngestionPred {
|
||||
return func(_ string, entries []LogEntry) (bool, error) {
|
||||
for _, e := range entries {
|
||||
if e.Location == location {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// TimeoutFun is a function that is called when the waiting times out.
|
||||
type TimeoutFun func([]string, []bool) error
|
||||
|
||||
// JustTimeout returns the error with the list of names for which backend is
|
||||
// still still missing logs.
|
||||
var JustTimeout TimeoutFun = func(names []string, ingested []bool) error {
|
||||
failedNames := []string{}
|
||||
for i, name := range names {
|
||||
if !ingested[i] {
|
||||
failedNames = append(failedNames, name)
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s",
|
||||
strings.Join(failedNames, ","))
|
||||
}
|
||||
|
||||
var _ LogChecker = &logChecker{}
|
||||
|
||||
type logChecker struct {
|
||||
provider LogProvider
|
||||
names []string
|
||||
ingested []bool
|
||||
ingestionPred IngestionPred
|
||||
timeoutFun TimeoutFun
|
||||
}
|
||||
|
||||
// NewLogChecker constructs a LogChecker for a list of names from custom
|
||||
// IngestionPred and TimeoutFun.
|
||||
func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker {
|
||||
return &logChecker{
|
||||
provider: p,
|
||||
names: names,
|
||||
ingested: make([]bool, len(names)),
|
||||
ingestionPred: pred,
|
||||
timeoutFun: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *logChecker) EntriesIngested() (bool, error) {
|
||||
allIngested := true
|
||||
for i, name := range c.names {
|
||||
if c.ingested[i] {
|
||||
continue
|
||||
}
|
||||
entries := c.provider.ReadEntries(name)
|
||||
ingested, err := c.ingestionPred(name, entries)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if ingested {
|
||||
c.ingested[i] = true
|
||||
}
|
||||
allIngested = allIngested && ingested
|
||||
}
|
||||
return allIngested, nil
|
||||
}
|
||||
|
||||
func (c *logChecker) Timeout() error {
|
||||
return c.timeoutFun(c.names, c.ingested)
|
||||
}
|
||||
|
||||
// NumberedIngestionPred is a IngestionPred that takes into account sequential
|
||||
// numbers of ingested entries.
|
||||
type NumberedIngestionPred func(string, map[int]bool) (bool, error)
|
||||
|
||||
// NumberedTimeoutFun is a TimeoutFun that takes into account sequential
|
||||
// numbers of ingested entries.
|
||||
type NumberedTimeoutFun func([]string, map[string]map[int]bool) error
|
||||
|
||||
// NewNumberedLogChecker returns a log checker that works with numbered log
|
||||
// entries generated by load logging pods.
|
||||
func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred,
|
||||
timeout NumberedTimeoutFun, names ...string) LogChecker {
|
||||
occs := map[string]map[int]bool{}
|
||||
return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) {
|
||||
occ, ok := occs[name]
|
||||
if !ok {
|
||||
occ = map[int]bool{}
|
||||
occs[name] = occ
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if no, ok := entry.TryGetEntryNumber(); ok {
|
||||
occ[no] = true
|
||||
}
|
||||
}
|
||||
return pred(name, occ)
|
||||
}, func(names []string, _ []bool) error {
|
||||
return timeout(names, occs)
|
||||
}, names...)
|
||||
}
|
||||
|
||||
// NewFullIngestionPodLogChecker returns a log checks that works with numbered
|
||||
// log entries generated by load logging pods and waits until all entries are
|
||||
// ingested. If timeout is reached, fraction is lost logs up to slack is
|
||||
// considered tolerable.
|
||||
func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker {
|
||||
podsMap := map[string]FiniteLoggingPod{}
|
||||
for _, p := range pods {
|
||||
podsMap[p.Name()] = p
|
||||
}
|
||||
return NewNumberedLogChecker(p, getFullIngestionPred(podsMap),
|
||||
getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...)
|
||||
}
|
||||
|
||||
func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred {
|
||||
return func(name string, occ map[int]bool) (bool, error) {
|
||||
p := podsMap[name]
|
||||
ok := len(occ) == p.ExpectedLineCount()
|
||||
return ok, nil
|
||||
}
|
||||
}
|
||||
|
||||
func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun {
|
||||
return func(names []string, occs map[string]map[int]bool) error {
|
||||
totalGot, totalWant := 0, 0
|
||||
lossMsgs := []string{}
|
||||
for _, name := range names {
|
||||
got := len(occs[name])
|
||||
want := podsMap[name].ExpectedLineCount()
|
||||
if got != want {
|
||||
lossMsg := fmt.Sprintf("%s: %d lines", name, want-got)
|
||||
lossMsgs = append(lossMsgs, lossMsg)
|
||||
}
|
||||
totalGot += got
|
||||
totalWant += want
|
||||
}
|
||||
if len(lossMsgs) > 0 {
|
||||
framework.Logf("Still missing logs from:\n%s", strings.Join(lossMsgs, "\n"))
|
||||
}
|
||||
lostFrac := 1 - float64(totalGot)/float64(totalWant)
|
||||
if lostFrac > slack {
|
||||
return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable",
|
||||
lostFrac*100, slack*100)
|
||||
}
|
||||
framework.Logf("Missing %.2f%% of logs, which is lower than the threshold %.2f%%",
|
||||
lostFrac*100, slack*100)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForLogs checks that logs are ingested, as reported by the log checker
|
||||
// until the timeout has passed. Function sleeps for interval between two
|
||||
// log ingestion checks.
|
||||
func WaitForLogs(c LogChecker, interval, timeout time.Duration) error {
|
||||
err := wait.Poll(interval, timeout, func() (bool, error) {
|
||||
return c.EntriesIngested()
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return c.Timeout()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string {
|
||||
names := []string{}
|
||||
for _, p := range pods {
|
||||
names = append(names, p.Name())
|
||||
}
|
||||
return names
|
||||
}
|
Loading…
Reference in New Issue
Block a user