diff --git a/test/e2e/cluster-logging/BUILD b/test/e2e/cluster-logging/BUILD index 5bfc9351ce7..0e99935a4d5 100644 --- a/test/e2e/cluster-logging/BUILD +++ b/test/e2e/cluster-logging/BUILD @@ -14,7 +14,7 @@ go_library( "es_utils.go", "sd.go", "sd_events.go", - "sd_load.go", + "sd_soak.go", "sd_utils.go", "utils.go", ], diff --git a/test/e2e/cluster-logging/sd.go b/test/e2e/cluster-logging/sd.go index f30f9777739..65a8ac66c14 100644 --- a/test/e2e/cluster-logging/sd.go +++ b/test/e2e/cluster-logging/sd.go @@ -26,24 +26,24 @@ import ( . "github.com/onsi/ginkgo" ) -var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { - f := framework.NewDefaultFramework("gcl-logging") +var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver", func() { + f := framework.NewDefaultFramework("sd-logging") BeforeEach(func() { framework.SkipUnlessProviderIs("gce", "gke") }) - It("should check that logs from containers are ingested in GCL", func() { + It("should ingest logs from applications", func() { podName := "synthlogger" - gclLogsProvider, err := newGclLogsProvider(f) - framework.ExpectNoError(err, "Failed to create GCL logs provider") + sdLogsProvider, err := newSdLogsProvider(f) + framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") - err = gclLogsProvider.Init() - defer gclLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init GCL logs provider") + err = sdLogsProvider.Init() + defer sdLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") - err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName()) + err = ensureSingleFluentdOnEachNode(f, sdLogsProvider.FluentdApplicationName()) framework.ExpectNoError(err, "Fluentd deployed incorrectly") By("Running synthetic logger") @@ -54,7 +54,7 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { By("Waiting for logs to ingest") config := &loggingTestConfig{ - LogsProvider: gclLogsProvider, + LogsProvider: sdLogsProvider, Pods: []*loggingPod{pod}, IngestionTimeout: 10 * time.Minute, MaxAllowedLostFraction: 0, diff --git a/test/e2e/cluster-logging/sd_events.go b/test/e2e/cluster-logging/sd_events.go index 913ab18e12e..2c81b3887c9 100644 --- a/test/e2e/cluster-logging/sd_events.go +++ b/test/e2e/cluster-logging/sd_events.go @@ -40,26 +40,26 @@ const ( eventCreationInterval = 10 * time.Second ) -var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { - f := framework.NewDefaultFramework("gcl-logging-events") +var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver", func() { + f := framework.NewDefaultFramework("sd-logging-events") BeforeEach(func() { framework.SkipUnlessProviderIs("gce", "gke") }) It("should ingest events", func() { - gclLogsProvider, err := newGclLogsProvider(f) - framework.ExpectNoError(err, "Failed to create GCL logs provider") + sdLogsProvider, err := newSdLogsProvider(f) + framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") - err = gclLogsProvider.Init() - defer gclLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init GCL logs provider") + err = sdLogsProvider.Init() + defer sdLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") stopCh := make(chan struct{}) successCh := make(chan struct{}) go func() { wait.Poll(eventPollingInterval, eventsIngestionTimeout, func() (bool, error) { - events := gclLogsProvider.ReadEvents() + events := sdLogsProvider.ReadEvents() if len(events) > 0 { framework.Logf("Some events are ingested, sample event: %v", events[0]) close(successCh) diff --git a/test/e2e/cluster-logging/sd_load.go b/test/e2e/cluster-logging/sd_load.go deleted file mode 100644 index c6ff8f4103a..00000000000 --- a/test/e2e/cluster-logging/sd_load.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "fmt" - "time" - - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/test/e2e/framework" - - . "github.com/onsi/ginkgo" -) - -const ( - loadTestMaxAllowedLostFraction = 0.01 - loadTestMaxAllowedFluentdRestarts = 1 -) - -var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:StackdriverLogging]", func() { - f := framework.NewDefaultFramework("gcl-logging-load") - - It("should create a constant load with long-living pods and ensure logs delivery", func() { - gclLogsProvider, err := newGclLogsProvider(f) - framework.ExpectNoError(err, "Failed to create GCL logs provider") - - err = gclLogsProvider.Init() - defer gclLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init GCL logs provider") - - nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items - nodeCount := len(nodes) - podCount := 30 * nodeCount - loggingDuration := 10 * time.Minute - linesPerSecond := 1000 * nodeCount - linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount - ingestionTimeout := 20 * time.Minute - - By("Running logs generator pods") - pods := []*loggingPod{} - for podIdx := 0; podIdx < podCount; podIdx++ { - node := nodes[podIdx%len(nodes)] - podName := fmt.Sprintf("logs-generator-%d-%d", linesPerPod, podIdx) - pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, loggingDuration)) - - defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) - } - - By("Waiting for all log lines to be ingested") - config := &loggingTestConfig{ - LogsProvider: gclLogsProvider, - Pods: pods, - IngestionTimeout: ingestionTimeout, - MaxAllowedLostFraction: loadTestMaxAllowedLostFraction, - MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts, - } - err = waitForFullLogsIngestion(f, config) - if err != nil { - framework.Failf("Failed to ingest logs: %v", err) - } else { - framework.Logf("Successfully ingested all logs") - } - }) - - It("should create a constant load with short-living pods and ensure logs delivery", func() { - gclLogsProvider, err := newGclLogsProvider(f) - framework.ExpectNoError(err, "Failed to create GCL logs provider") - - err = gclLogsProvider.Init() - defer gclLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init GCL logs provider") - - nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items - maxPodCount := 10 - jobDuration := 1 * time.Minute - linesPerPodPerSecond := 100 - testDuration := 10 * time.Minute - ingestionTimeout := 20 * time.Minute - - podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) - podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1 - linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) - - By("Running short-living pods") - pods := []*loggingPod{} - for runIdx := 0; runIdx < podRunCount; runIdx++ { - for nodeIdx, node := range nodes { - podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) - pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration)) - - defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) - } - time.Sleep(podRunDelay) - } - - By("Waiting for all log lines to be ingested") - config := &loggingTestConfig{ - LogsProvider: gclLogsProvider, - Pods: pods, - IngestionTimeout: ingestionTimeout, - MaxAllowedLostFraction: loadTestMaxAllowedLostFraction, - MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts, - } - err = waitForFullLogsIngestion(f, config) - if err != nil { - framework.Failf("Failed to ingest logs: %v", err) - } else { - framework.Logf("Successfully ingested all logs") - } - }) -}) diff --git a/test/e2e/cluster-logging/sd_soak.go b/test/e2e/cluster-logging/sd_soak.go new file mode 100644 index 00000000000..3d8f9485a79 --- /dev/null +++ b/test/e2e/cluster-logging/sd_soak.go @@ -0,0 +1,90 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "math" + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" +) + +const ( + // maxAllowedLostFraction is the fraction of lost logs considered acceptable. + maxAllowedLostFraction = 0.01 + // maxAllowedRestartsPerHour is the number of fluentd container restarts + // considered acceptable. Once per hour is fine for now, as long as it + // doesn't loose too much logs. + maxAllowedRestartsPerHour = 1.0 +) + +var _ = framework.KubeDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() { + f := framework.NewDefaultFramework("sd-logging-load") + + It("should ingest logs from applications running for a prolonged amount of time", func() { + sdLogsProvider, err := newSdLogsProvider(f) + framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") + + err = sdLogsProvider.Init() + defer sdLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") + + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items + maxPodCount := 10 + jobDuration := 1 * time.Hour + linesPerPodPerSecond := 100 + testDuration := 21 * time.Hour + ingestionTimeout := testDuration + 30*time.Minute + allowedRestarts := int(math.Ceil(float64(testDuration) / + float64(time.Hour) * maxAllowedRestartsPerHour)) + + podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) + podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1 + linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) + + By("Running short-living pods") + pods := []*loggingPod{} + for runIdx := 0; runIdx < podRunCount; runIdx++ { + for nodeIdx, node := range nodes { + podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) + pods = append(pods, createLoggingPod(f, podName, node.Name, linesPerPod, jobDuration)) + + defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + } + time.Sleep(podRunDelay) + } + + By("Waiting for all log lines to be ingested") + config := &loggingTestConfig{ + LogsProvider: sdLogsProvider, + Pods: pods, + IngestionTimeout: ingestionTimeout, + MaxAllowedLostFraction: maxAllowedLostFraction, + MaxAllowedFluentdRestarts: allowedRestarts, + } + err = waitForFullLogsIngestion(f, config) + if err != nil { + framework.Failf("Failed to ingest logs: %v", err) + } else { + framework.Logf("Successfully ingested all logs") + } + }) +}) diff --git a/test/e2e/cluster-logging/sd_utils.go b/test/e2e/cluster-logging/sd_utils.go index d64d9a293c1..f5be7191f1f 100644 --- a/test/e2e/cluster-logging/sd_utils.go +++ b/test/e2e/cluster-logging/sd_utils.go @@ -28,14 +28,14 @@ import ( "golang.org/x/net/context" "golang.org/x/oauth2/google" - gcl "google.golang.org/api/logging/v2beta1" + sd "google.golang.org/api/logging/v2beta1" pubsub "google.golang.org/api/pubsub/v1" ) const ( - // The amount of time to wait before considering - // Stackdriver Logging sink operational - sinkInitialDelay = 1 * time.Minute + // The amount of time to wait for Stackdriver Logging + // sink to become operational + sinkStartupTimeout = 10 * time.Minute // The limit on the number of messages to pull from PubSub maxPullLogMessages = 100 * 1000 @@ -44,26 +44,26 @@ const ( maxCacheSize = 10 * 1000 // PubSub topic with log entries polling interval - gclLoggingPollInterval = 100 * time.Millisecond + sdLoggingPollInterval = 100 * time.Millisecond ) -type gclLogsProvider struct { - GclService *gcl.Service +type sdLogsProvider struct { + SdService *sd.Service PubsubService *pubsub.Service Framework *framework.Framework Topic *pubsub.Topic Subscription *pubsub.Subscription - LogSink *gcl.LogSink + LogSink *sd.LogSink LogEntryCache map[string]chan logEntry EventCache chan map[string]interface{} CacheMutex *sync.Mutex PollingStopChannel chan struct{} } -func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { +func newSdLogsProvider(f *framework.Framework) (*sdLogsProvider, error) { ctx := context.Background() - hc, err := google.DefaultClient(ctx, gcl.CloudPlatformScope) - gclService, err := gcl.New(hc) + hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope) + sdService, err := sd.New(hc) if err != nil { return nil, err } @@ -73,8 +73,8 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { return nil, err } - provider := &gclLogsProvider{ - GclService: gclService, + provider := &sdLogsProvider{ + SdService: sdService, PubsubService: pubsubService, Framework: f, LogEntryCache: map[string]chan logEntry{}, @@ -85,131 +85,144 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { return provider, nil } -func (gclLogsProvider *gclLogsProvider) Init() error { +func (sdLogsProvider *sdLogsProvider) Init() error { projectId := framework.TestContext.CloudConfig.ProjectID - nsName := gclLogsProvider.Framework.Namespace.Name + nsName := sdLogsProvider.Framework.Namespace.Name - topic, err := gclLogsProvider.createPubSubTopic(projectId, nsName) + topic, err := sdLogsProvider.createPubSubTopic(projectId, nsName) if err != nil { return fmt.Errorf("failed to create PubSub topic: %v", err) } - gclLogsProvider.Topic = topic + sdLogsProvider.Topic = topic - subs, err := gclLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name) + subs, err := sdLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name) if err != nil { return fmt.Errorf("failed to create PubSub subscription: %v", err) } - gclLogsProvider.Subscription = subs + sdLogsProvider.Subscription = subs - logSink, err := gclLogsProvider.createGclLogSink(projectId, nsName, nsName, topic.Name) + logSink, err := sdLogsProvider.createSink(projectId, nsName, nsName, topic.Name) if err != nil { return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err) } - gclLogsProvider.LogSink = logSink + sdLogsProvider.LogSink = logSink - if err = gclLogsProvider.authorizeGclLogSink(); err != nil { + if err = sdLogsProvider.authorizeSink(); err != nil { return fmt.Errorf("failed to authorize log sink: %v", err) } - framework.Logf("Waiting for log sink to become operational") - // TODO: Replace with something more intelligent - time.Sleep(sinkInitialDelay) + if err = sdLogsProvider.waitSinkInit(); err != nil { + return fmt.Errorf("failed to wait for sink to become operational: %v", err) + } - go gclLogsProvider.pollLogs() + go sdLogsProvider.pollLogs() return nil } -func (gclLogsProvider *gclLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) { +func (sdLogsProvider *sdLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) { topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName) topic := &pubsub.Topic{ Name: topicFullName, } - return gclLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do() + return sdLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do() } -func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) { +func (sdLogsProvider *sdLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) { subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName) subs := &pubsub.Subscription{ Name: subsFullName, Topic: topicName, } - return gclLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() + return sdLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() } -func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) { +func (sdLogsProvider *sdLogsProvider) createSink(projectId, nsName, sinkName, topicName string) (*sd.LogSink, error) { projectDst := fmt.Sprintf("projects/%s", projectId) filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+ "(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName) framework.Logf("Using the following filter for entries: %s", filter) - sink := &gcl.LogSink{ + sink := &sd.LogSink{ Name: sinkName, Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), Filter: filter, } - return gclLogsProvider.GclService.Projects.Sinks.Create(projectDst, sink).Do() + return sdLogsProvider.SdService.Projects.Sinks.Create(projectDst, sink).Do() } -func (gclLogsProvider *gclLogsProvider) authorizeGclLogSink() error { - topicsService := gclLogsProvider.PubsubService.Projects.Topics - policy, err := topicsService.GetIamPolicy(gclLogsProvider.Topic.Name).Do() +func (sdLogsProvider *sdLogsProvider) authorizeSink() error { + topicsService := sdLogsProvider.PubsubService.Projects.Topics + policy, err := topicsService.GetIamPolicy(sdLogsProvider.Topic.Name).Do() if err != nil { return err } binding := &pubsub.Binding{ Role: "roles/pubsub.publisher", - Members: []string{gclLogsProvider.LogSink.WriterIdentity}, + Members: []string{sdLogsProvider.LogSink.WriterIdentity}, } policy.Bindings = append(policy.Bindings, binding) req := &pubsub.SetIamPolicyRequest{Policy: policy} - if _, err = topicsService.SetIamPolicy(gclLogsProvider.Topic.Name, req).Do(); err != nil { + if _, err = topicsService.SetIamPolicy(sdLogsProvider.Topic.Name, req).Do(); err != nil { return err } return nil } -func (gclLogsProvider *gclLogsProvider) pollLogs() { - wait.PollUntil(gclLoggingPollInterval, func() (bool, error) { - subsName := gclLogsProvider.Subscription.Name - subsService := gclLogsProvider.PubsubService.Projects.Subscriptions - req := &pubsub.PullRequest{ - ReturnImmediately: true, - MaxMessages: maxPullLogMessages, - } - resp, err := subsService.Pull(subsName, req).Do() +func (sdLogsProvider *sdLogsProvider) waitSinkInit() error { + framework.Logf("Waiting for log sink to become operational") + return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) { + err := publish(sdLogsProvider.PubsubService, sdLogsProvider.Topic, "embrace eternity") if err != nil { - framework.Logf("Failed to pull messaged from PubSub due to %v", err) + framework.Logf("Failed to push message to PubSub due to %v", err) + } + + messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription) + if err != nil { + framework.Logf("Failed to pull messages from PubSub due to %v", err) + return false, nil + } + if len(messages) > 0 { + framework.Logf("Sink %s is operational", sdLogsProvider.LogSink.Name) + return true, nil + } + + return false, nil + }) +} + +func (sdLogsProvider *sdLogsProvider) pollLogs() { + wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { + messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription) + if err != nil { + framework.Logf("Failed to pull messages from PubSub due to %v", err) return false, nil } - ids := []string{} - for _, msg := range resp.ReceivedMessages { - ids = append(ids, msg.AckId) - + for _, msg := range messages { logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) if err != nil { framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) continue } - var gclLogEntry gcl.LogEntry - if err := json.Unmarshal(logEntryEncoded, &gclLogEntry); err != nil { + var sdLogEntry sd.LogEntry + if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil { framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) continue } - switch gclLogEntry.Resource.Type { + switch sdLogEntry.Resource.Type { case "container": - podName := gclLogEntry.Resource.Labels["pod_id"] - ch := gclLogsProvider.getCacheChannel(podName) - ch <- logEntry{Payload: gclLogEntry.TextPayload} + podName := sdLogEntry.Resource.Labels["pod_id"] + ch := sdLogsProvider.getCacheChannel(podName) + ch <- logEntry{Payload: sdLogEntry.TextPayload} break case "gke_cluster": - jsonPayloadRaw, err := gclLogEntry.JsonPayload.MarshalJSON() + jsonPayloadRaw, err := sdLogEntry.JsonPayload.MarshalJSON() if err != nil { - framework.Logf("Failed to get jsonPayload from LogEntry %v", gclLogEntry) + framework.Logf("Failed to get jsonPayload from LogEntry %v", sdLogEntry) break } var eventObject map[string]interface{} @@ -218,55 +231,48 @@ func (gclLogsProvider *gclLogsProvider) pollLogs() { framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:])) break } - gclLogsProvider.EventCache <- eventObject + sdLogsProvider.EventCache <- eventObject break default: - framework.Logf("Received LogEntry with unexpected resource type: %s", gclLogEntry.Resource.Type) + framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type) break } } - if len(ids) > 0 { - ackReq := &pubsub.AcknowledgeRequest{AckIds: ids} - if _, err = subsService.Acknowledge(subsName, ackReq).Do(); err != nil { - framework.Logf("Failed to ack: %v", err) - } - } - return false, nil - }, gclLogsProvider.PollingStopChannel) + }, sdLogsProvider.PollingStopChannel) } -func (gclLogsProvider *gclLogsProvider) Cleanup() { - gclLogsProvider.PollingStopChannel <- struct{}{} +func (sdLogsProvider *sdLogsProvider) Cleanup() { + sdLogsProvider.PollingStopChannel <- struct{}{} - if gclLogsProvider.LogSink != nil { + if sdLogsProvider.LogSink != nil { projectId := framework.TestContext.CloudConfig.ProjectID - sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, gclLogsProvider.LogSink.Name) - sinksService := gclLogsProvider.GclService.Projects.Sinks + sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, sdLogsProvider.LogSink.Name) + sinksService := sdLogsProvider.SdService.Projects.Sinks if _, err := sinksService.Delete(sinkNameId).Do(); err != nil { framework.Logf("Failed to delete LogSink: %v", err) } } - if gclLogsProvider.Subscription != nil { - subsService := gclLogsProvider.PubsubService.Projects.Subscriptions - if _, err := subsService.Delete(gclLogsProvider.Subscription.Name).Do(); err != nil { + if sdLogsProvider.Subscription != nil { + subsService := sdLogsProvider.PubsubService.Projects.Subscriptions + if _, err := subsService.Delete(sdLogsProvider.Subscription.Name).Do(); err != nil { framework.Logf("Failed to delete PubSub subscription: %v", err) } } - if gclLogsProvider.Topic != nil { - topicsService := gclLogsProvider.PubsubService.Projects.Topics - if _, err := topicsService.Delete(gclLogsProvider.Topic.Name).Do(); err != nil { + if sdLogsProvider.Topic != nil { + topicsService := sdLogsProvider.PubsubService.Projects.Topics + if _, err := topicsService.Delete(sdLogsProvider.Topic.Name).Do(); err != nil { framework.Logf("Failed to delete PubSub topic: %v", err) } } } -func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []logEntry { +func (sdLogsProvider *sdLogsProvider) ReadEntries(pod *loggingPod) []logEntry { var entries []logEntry - ch := gclLogsProvider.getCacheChannel(pod.Name) + ch := sdLogsProvider.getCacheChannel(pod.Name) polling_loop: for { select { @@ -279,16 +285,16 @@ polling_loop: return entries } -func (logsProvider *gclLogsProvider) FluentdApplicationName() string { +func (logsProvider *sdLogsProvider) FluentdApplicationName() string { return "fluentd-gcp" } -func (gclLogsProvider *gclLogsProvider) ReadEvents() []map[string]interface{} { +func (sdLogsProvider *sdLogsProvider) ReadEvents() []map[string]interface{} { var events []map[string]interface{} polling_loop: for { select { - case event := <-gclLogsProvider.EventCache: + case event := <-sdLogsProvider.EventCache: events = append(events, event) default: break polling_loop @@ -297,15 +303,54 @@ polling_loop: return events } -func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry { - gclLogsProvider.CacheMutex.Lock() - defer gclLogsProvider.CacheMutex.Unlock() +func (sdLogsProvider *sdLogsProvider) getCacheChannel(podName string) chan logEntry { + sdLogsProvider.CacheMutex.Lock() + defer sdLogsProvider.CacheMutex.Unlock() - if ch, ok := gclLogsProvider.LogEntryCache[podName]; ok { + if ch, ok := sdLogsProvider.LogEntryCache[podName]; ok { return ch } newCh := make(chan logEntry, maxCacheSize) - gclLogsProvider.LogEntryCache[podName] = newCh + sdLogsProvider.LogEntryCache[podName] = newCh return newCh } + +func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) { + subsService := service.Projects.Subscriptions + req := &pubsub.PullRequest{ + ReturnImmediately: true, + MaxMessages: maxPullLogMessages, + } + + resp, err := subsService.Pull(subs.Name, req).Do() + if err != nil { + return nil, err + } + + var ids []string + for _, msg := range resp.ReceivedMessages { + ids = append(ids, msg.AckId) + } + if len(ids) > 0 { + ackReq := &pubsub.AcknowledgeRequest{AckIds: ids} + if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil { + framework.Logf("Failed to ack poll: %v", err) + } + } + + return resp.ReceivedMessages, nil +} + +func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error { + topicsService := service.Projects.Topics + req := &pubsub.PublishRequest{ + Messages: []*pubsub.PubsubMessage{ + { + Data: base64.StdEncoding.EncodeToString([]byte(msg)), + }, + }, + } + _, err := topicsService.Publish(topic.Name, req).Do() + return err +} diff --git a/test/e2e/cluster-logging/utils.go b/test/e2e/cluster-logging/utils.go index 7cc0a5dc826..01190acd2ef 100644 --- a/test/e2e/cluster-logging/utils.go +++ b/test/e2e/cluster-logging/utils.go @@ -213,6 +213,11 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig) if totalMissing > 0 { framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines", config.IngestionTimeout, totalMissing, lostFraction*100) + for podIdx, missing := range missingByPod { + if missing != 0 { + framework.Logf("Still missing %d lines for pod %v", missing, config.Pods[podIdx]) + } + } } if lostFraction > config.MaxAllowedLostFraction {