Implement Stackdrvier Logging e2e tests using PubSub

This commit is contained in:
Mik Vyatskov 2017-05-23 13:50:10 +02:00 committed by Mikhail Vyatskov
parent fd7c4b02fa
commit 3cada4c717
7 changed files with 268 additions and 145 deletions

View File

@ -27,10 +27,12 @@ go_library(
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/golang.org/x/oauth2/google:go_default_library",
"//vendor/google.golang.org/api/logging/v2beta1:go_default_library",
"//vendor/google.golang.org/api/pubsub/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
],
)

View File

@ -39,10 +39,11 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
It("should check that logs from containers are ingested into Elasticsearch", func() {
podName := "synthlogger"
esLogsProvider, err := newEsLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
framework.ExpectNoError(err, "Failed to create Elasticsearch logs provider")
err = esLogsProvider.EnsureWorking()
framework.ExpectNoError(err, "Elasticsearch is not working")
err = esLogsProvider.Init()
defer esLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init Elasticsearch logs provider")
err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName())
framework.ExpectNoError(err, "Fluentd deployed incorrectly")

View File

@ -46,12 +46,8 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
return &esLogsProvider{Framework: f}, nil
}
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
return "fluentd-es"
}
// Ensures that elasticsearch is running and ready to serve requests
func (logsProvider *esLogsProvider) EnsureWorking() error {
func (logsProvider *esLogsProvider) Init() error {
f := logsProvider.Framework
// Check for the existence of the Elasticsearch service.
By("Checking the Elasticsearch service exists.")
@ -157,7 +153,11 @@ func (logsProvider *esLogsProvider) EnsureWorking() error {
return nil
}
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
func (logsProvider *esLogsProvider) Cleanup() {
// Nothing to do
}
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
f := logsProvider.Framework
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
@ -202,7 +202,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
return nil
}
entries := []*logEntry{}
entries := []logEntry{}
// Iterate over the hits and populate the observed array.
for _, e := range h {
l, ok := e.(map[string]interface{})
@ -223,22 +223,12 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
continue
}
timestampString, ok := source["@timestamp"].(string)
if !ok {
framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"])
continue
}
timestamp, err := time.Parse(time.RFC3339, timestampString)
if err != nil {
framework.Logf("Timestamp was not in correct format: %s", timestampString)
continue
}
entries = append(entries, &logEntry{
Payload: msg,
Timestamp: timestamp,
})
entries = append(entries, logEntry{Payload: msg})
}
return entries
}
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
return "fluentd-es"
}

View File

@ -39,8 +39,9 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = gclLogsProvider.EnsureWorking()
framework.ExpectNoError(err, "GCL is not working")
err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")
err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName())
framework.ExpectNoError(err, "Fluentd deployed incorrectly")

View File

@ -38,13 +38,17 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
nodeCount := len(nodes)
podCount := 30 * nodeCount
loggingDuration := 10 * time.Minute
linesPerSecond := 1000 * nodeCount
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
ingestionTimeout := 60 * time.Minute
ingestionTimeout := 20 * time.Minute
By("Running logs generator pods")
pods := []*loggingPod{}
@ -56,9 +60,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
}
By("Waiting for pods to succeed")
time.Sleep(loggingDuration)
By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,
@ -79,12 +80,16 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = gclLogsProvider.Init()
defer gclLogsProvider.Cleanup()
framework.ExpectNoError(err, "Failed to init GCL logs provider")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
maxPodCount := 10
jobDuration := 1 * time.Minute
linesPerPodPerSecond := 100
testDuration := 10 * time.Minute
ingestionTimeout := 60 * time.Minute
ingestionTimeout := 20 * time.Minute
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
@ -102,9 +107,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr
time.Sleep(podRunDelay)
}
By("Waiting for the last pods to finish")
time.Sleep(jobDuration)
By("Waiting for all log lines to be ingested")
config := &loggingTestConfig{
LogsProvider: gclLogsProvider,

View File

@ -17,36 +17,46 @@ limitations under the License.
package e2e
import (
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
gcl "google.golang.org/api/logging/v2beta1"
pubsub "google.golang.org/api/pubsub/v1"
)
const (
// GCL doesn't support page size more than 1000
gclPageSize = 1000
// The amount of time to wait before considering
// Stackdriver Logging sink operational
sinkInitialDelay = 1 * time.Minute
// If we failed to get response from GCL, it can be a random 500 or
// quota limit exceeded. So we retry for some time in case the problem will go away.
// Quota is enforced every 100 seconds, so we have to wait for more than
// that to reliably get the next portion.
queryGclRetryDelay = 100 * time.Second
queryGclRetryTimeout = 250 * time.Second
// The limit on the number of messages to pull from PubSub
maxPullLogMessages = 100 * 1000
// The limit on the number of messages in the cache for a pod
maxCachedMessagesPerPod = 10 * 1000
// PubSub topic with log entries polling interval
gclLoggingPollInterval = 100 * time.Millisecond
)
type gclLogsProvider struct {
GclService *gcl.Service
Framework *framework.Framework
}
func (gclLogsProvider *gclLogsProvider) EnsureWorking() error {
// We assume that GCL is always working
return nil
GclService *gcl.Service
PubsubService *pubsub.Service
Framework *framework.Framework
Topic *pubsub.Topic
Subscription *pubsub.Subscription
LogSink *gcl.LogSink
LogEntryCache map[string]chan logEntry
CacheMutex *sync.Mutex
PollingStopChannel chan struct{}
}
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
@ -57,75 +67,206 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
return nil, err
}
pubsubService, err := pubsub.New(hc)
if err != nil {
return nil, err
}
provider := &gclLogsProvider{
GclService: gclService,
Framework: f,
GclService: gclService,
PubsubService: pubsubService,
Framework: f,
LogEntryCache: map[string]chan logEntry{},
CacheMutex: &sync.Mutex{},
PollingStopChannel: make(chan struct{}, 1),
}
return provider, nil
}
func (gclLogsProvider *gclLogsProvider) Init() error {
projectId := framework.TestContext.CloudConfig.ProjectID
nsName := gclLogsProvider.Framework.Namespace.Name
topic, err := gclLogsProvider.createPubSubTopic(projectId, nsName)
if err != nil {
return fmt.Errorf("failed to create PubSub topic: %v", err)
}
gclLogsProvider.Topic = topic
subs, err := gclLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name)
if err != nil {
return fmt.Errorf("failed to create PubSub subscription: %v", err)
}
gclLogsProvider.Subscription = subs
logSink, err := gclLogsProvider.createGclLogSink(projectId, nsName, nsName, topic.Name)
if err != nil {
return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
}
gclLogsProvider.LogSink = logSink
if err = gclLogsProvider.authorizeGclLogSink(); err != nil {
return fmt.Errorf("failed to authorize log sink: %v", err)
}
framework.Logf("Waiting for log sink to become operational")
// TODO: Replace with something more intelligent
time.Sleep(sinkInitialDelay)
go gclLogsProvider.pollLogs()
return nil
}
func (gclLogsProvider *gclLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) {
topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName)
topic := &pubsub.Topic{
Name: topicFullName,
}
return gclLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do()
}
func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) {
subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName)
subs := &pubsub.Subscription{
Name: subsFullName,
Topic: topicName,
}
return gclLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
}
func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) {
projectDst := fmt.Sprintf("projects/%s", projectId)
filter := fmt.Sprintf("resource.labels.namespace_id=%s AND resource.labels.container_name=%s", nsName, loggingContainerName)
sink := &gcl.LogSink{
Name: sinkName,
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
Filter: filter,
}
return gclLogsProvider.GclService.Projects.Sinks.Create(projectDst, sink).Do()
}
func (gclLogsProvider *gclLogsProvider) authorizeGclLogSink() error {
topicsService := gclLogsProvider.PubsubService.Projects.Topics
policy, err := topicsService.GetIamPolicy(gclLogsProvider.Topic.Name).Do()
if err != nil {
return err
}
binding := &pubsub.Binding{
Role: "roles/pubsub.publisher",
Members: []string{gclLogsProvider.LogSink.WriterIdentity},
}
policy.Bindings = append(policy.Bindings, binding)
req := &pubsub.SetIamPolicyRequest{Policy: policy}
if _, err = topicsService.SetIamPolicy(gclLogsProvider.Topic.Name, req).Do(); err != nil {
return err
}
return nil
}
func (gclLogsProvider *gclLogsProvider) pollLogs() {
wait.PollUntil(gclLoggingPollInterval, func() (bool, error) {
subsName := gclLogsProvider.Subscription.Name
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions
req := &pubsub.PullRequest{
ReturnImmediately: true,
MaxMessages: maxPullLogMessages,
}
resp, err := subsService.Pull(subsName, req).Do()
if err != nil {
framework.Logf("Failed to pull messaged from PubSub due to %v", err)
return false, nil
}
ids := []string{}
for _, msg := range resp.ReceivedMessages {
ids = append(ids, msg.AckId)
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
if err != nil {
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
continue
}
var gclLogEntry gcl.LogEntry
if err := json.Unmarshal(logEntryEncoded, &gclLogEntry); err != nil {
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
continue
}
podName := gclLogEntry.Resource.Labels["pod_id"]
ch := gclLogsProvider.getCacheChannel(podName)
ch <- logEntry{Payload: gclLogEntry.TextPayload}
}
if len(ids) > 0 {
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
if _, err = subsService.Acknowledge(subsName, ackReq).Do(); err != nil {
framework.Logf("Failed to ack: %v", err)
}
}
return false, nil
}, gclLogsProvider.PollingStopChannel)
}
func (gclLogsProvider *gclLogsProvider) Cleanup() {
gclLogsProvider.PollingStopChannel <- struct{}{}
if gclLogsProvider.LogSink != nil {
projectId := framework.TestContext.CloudConfig.ProjectID
sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, gclLogsProvider.LogSink.Name)
sinksService := gclLogsProvider.GclService.Projects.Sinks
if _, err := sinksService.Delete(sinkNameId).Do(); err != nil {
framework.Logf("Failed to delete LogSink: %v", err)
}
}
if gclLogsProvider.Subscription != nil {
subsService := gclLogsProvider.PubsubService.Projects.Subscriptions
if _, err := subsService.Delete(gclLogsProvider.Subscription.Name).Do(); err != nil {
framework.Logf("Failed to delete PubSub subscription: %v", err)
}
}
if gclLogsProvider.Topic != nil {
topicsService := gclLogsProvider.PubsubService.Projects.Topics
if _, err := topicsService.Delete(gclLogsProvider.Topic.Name).Do(); err != nil {
framework.Logf("Failed to delete PubSub topic: %v", err)
}
}
}
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
var entries []logEntry
ch := gclLogsProvider.getCacheChannel(pod.Name)
polling_loop:
for {
select {
case entry := <-ch:
entries = append(entries, entry)
default:
break polling_loop
}
}
return entries
}
func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
return "fluentd-gcp"
}
// Since GCL API is not easily available from the outside of cluster
// we use gcloud command to perform search with filter
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
filter := fmt.Sprintf("resource.labels.pod_id=%s AND resource.labels.namespace_id=%s AND timestamp>=\"%v\"",
pod.Name, gclLogsProvider.Framework.Namespace.Name, pod.LastTimestamp.Format(time.RFC3339))
framework.Logf("Reading entries from GCL with filter '%v'", filter)
func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry {
gclLogsProvider.CacheMutex.Lock()
defer gclLogsProvider.CacheMutex.Unlock()
response := getResponseSafe(gclLogsProvider.GclService, filter, "")
var entries []*logEntry
for response != nil && len(response.Entries) > 0 {
framework.Logf("Received %d entries from GCL", len(response.Entries))
for _, entry := range response.Entries {
if entry.TextPayload == "" {
continue
}
timestamp, parseErr := time.Parse(time.RFC3339, entry.Timestamp)
if parseErr != nil {
continue
}
entries = append(entries, &logEntry{
Timestamp: timestamp,
Payload: entry.TextPayload,
})
}
nextToken := response.NextPageToken
if nextToken == "" {
break
}
response = getResponseSafe(gclLogsProvider.GclService, filter, response.NextPageToken)
if ch, ok := gclLogsProvider.LogEntryCache[podName]; ok {
return ch
}
return entries
}
func getResponseSafe(gclService *gcl.Service, filter string, pageToken string) *gcl.ListLogEntriesResponse {
for start := time.Now(); time.Since(start) < queryGclRetryTimeout; time.Sleep(queryGclRetryDelay) {
response, err := gclService.Entries.List(&gcl.ListLogEntriesRequest{
ProjectIds: []string{
framework.TestContext.CloudConfig.ProjectID,
},
OrderBy: "timestamp desc",
Filter: filter,
PageSize: int64(gclPageSize),
PageToken: pageToken,
}).Do()
if err == nil {
return response
}
framework.Logf("Failed to get response from GCL due to %v, retrying", err)
}
return nil
newCh := make(chan logEntry, maxCachedMessagesPerPod)
gclLogsProvider.LogEntryCache[podName] = newCh
return newCh
}

View File

@ -33,13 +33,16 @@ import (
const (
// Duration of delay between any two attempts to check if all logs are ingested
ingestionRetryDelay = 100 * time.Second
ingestionRetryDelay = 30 * time.Second
// Amount of requested cores for logging container in millicores
loggingContainerCpuRequest = 10
// Amount of requested memory for logging container in bytes
loggingContainerMemoryRequest = 10 * 1024 * 1024
// Name of the container used for logging tests
loggingContainerName = "logging-container"
)
var (
@ -51,26 +54,21 @@ var (
type loggingPod struct {
// Name of the pod
Name string
// If we didn't read some log entries, their
// timestamps should be no less than this timestamp.
// Effectively, timestamp of the last ingested entry
// for which there's no missing entry before it
LastTimestamp time.Time
// Cache of ingested and read entries
Occurrences map[int]*logEntry
Occurrences map[int]logEntry
// Number of lines expected to be ingested from this pod
ExpectedLinesNumber int
}
type logEntry struct {
Payload string
Timestamp time.Time
Payload string
}
type logsProvider interface {
Init() error
Cleanup()
ReadEntries(*loggingPod) []logEntry
FluentdApplicationName() string
EnsureWorking() error
ReadEntries(*loggingPod) []*logEntry
}
type loggingTestConfig struct {
@ -81,7 +79,7 @@ type loggingTestConfig struct {
MaxAllowedFluentdRestarts int
}
func (entry *logEntry) getLogEntryNumber() (int, bool) {
func (entry logEntry) getLogEntryNumber() (int, bool) {
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
if submatch == nil || len(submatch) < 2 {
return 0, false
@ -96,10 +94,8 @@ func createLoggingPod(f *framework.Framework, podName string, nodeName string, t
createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration)
return &loggingPod{
Name: podName,
// It's used to avoid querying logs from before the pod was started
LastTimestamp: time.Now(),
Occurrences: make(map[int]*logEntry),
Name: podName,
Occurrences: make(map[int]logEntry),
ExpectedLinesNumber: totalLines,
}
}
@ -113,7 +109,7 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str
RestartPolicy: api_v1.RestartPolicyNever,
Containers: []api_v1.Container{
{
Name: podName,
Name: loggingContainerName,
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
Env: []api_v1.EnvVar{
{
@ -146,7 +142,7 @@ func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
podHasIngestedLogs := make([]bool, len(config.Pods))
podWithIngestedLogsCount := 0
for start := time.Now(); podWithIngestedLogsCount < len(config.Pods) && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
for podIdx, pod := range config.Pods {
if podHasIngestedLogs[podIdx] {
continue
@ -167,6 +163,10 @@ func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
}
}
}
if podWithIngestedLogsCount == len(config.Pods) {
break
}
}
if podWithIngestedLogsCount < len(config.Pods) {
@ -189,7 +189,7 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig)
missingByPod[podIdx] = pod.ExpectedLinesNumber
}
for start := time.Now(); totalMissing > 0 && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
missing := 0
for podIdx, pod := range config.Pods {
if missingByPod[podIdx] == 0 {
@ -203,6 +203,8 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig)
totalMissing = missing
if totalMissing > 0 {
framework.Logf("Still missing %d lines in total", totalMissing)
} else {
break
}
}
@ -245,19 +247,14 @@ func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int {
if err != nil {
framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err)
return pod.ExpectedLinesNumber
} else if missingOnPod > 0 {
framework.Logf("Pod %s is missing %d lines", pod.Name, missingOnPod)
} else {
framework.Logf("All logs from pod %s are ingested", pod.Name)
}
return missingOnPod
}
func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) {
entries := logsProvider.ReadEntries(pod)
framework.Logf("Got %d entries from provider", len(entries))
for _, entry := range entries {
lineNumber, ok := entry.getLogEntryNumber()
if !ok {
@ -271,17 +268,6 @@ func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, erro
}
}
for i := 0; i < pod.ExpectedLinesNumber; i++ {
entry, ok := pod.Occurrences[i]
if !ok {
break
}
if entry.Timestamp.After(pod.LastTimestamp) {
pod.LastTimestamp = entry.Timestamp
}
}
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
}