diff --git a/test/e2e/cluster-logging/BUILD b/test/e2e/cluster-logging/BUILD index 318290979c3..0cc07971722 100644 --- a/test/e2e/cluster-logging/BUILD +++ b/test/e2e/cluster-logging/BUILD @@ -27,10 +27,12 @@ go_library( "//vendor/golang.org/x/net/context:go_default_library", "//vendor/golang.org/x/oauth2/google:go_default_library", "//vendor/google.golang.org/api/logging/v2beta1:go_default_library", + "//vendor/google.golang.org/api/pubsub/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/util/integer:go_default_library", ], ) diff --git a/test/e2e/cluster-logging/es.go b/test/e2e/cluster-logging/es.go index 89ce04cd856..bf3488d4abe 100644 --- a/test/e2e/cluster-logging/es.go +++ b/test/e2e/cluster-logging/es.go @@ -39,10 +39,11 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu It("should check that logs from containers are ingested into Elasticsearch", func() { podName := "synthlogger" esLogsProvider, err := newEsLogsProvider(f) - framework.ExpectNoError(err, "Failed to create GCL logs provider") + framework.ExpectNoError(err, "Failed to create Elasticsearch logs provider") - err = esLogsProvider.EnsureWorking() - framework.ExpectNoError(err, "Elasticsearch is not working") + err = esLogsProvider.Init() + defer esLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init Elasticsearch logs provider") err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName()) framework.ExpectNoError(err, "Fluentd deployed incorrectly") diff --git a/test/e2e/cluster-logging/es_utils.go b/test/e2e/cluster-logging/es_utils.go index b521d661ffd..d28237a8202 100644 --- a/test/e2e/cluster-logging/es_utils.go +++ b/test/e2e/cluster-logging/es_utils.go @@ -46,12 +46,8 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) { return &esLogsProvider{Framework: f}, nil } -func (logsProvider *esLogsProvider) FluentdApplicationName() string { - return "fluentd-es" -} - // Ensures that elasticsearch is running and ready to serve requests -func (logsProvider *esLogsProvider) EnsureWorking() error { +func (logsProvider *esLogsProvider) Init() error { f := logsProvider.Framework // Check for the existence of the Elasticsearch service. By("Checking the Elasticsearch service exists.") @@ -157,7 +153,11 @@ func (logsProvider *esLogsProvider) EnsureWorking() error { return nil } -func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { +func (logsProvider *esLogsProvider) Cleanup() { + // Nothing to do +} + +func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry { f := logsProvider.Framework proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) @@ -202,7 +202,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { return nil } - entries := []*logEntry{} + entries := []logEntry{} // Iterate over the hits and populate the observed array. for _, e := range h { l, ok := e.(map[string]interface{}) @@ -223,22 +223,12 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { continue } - timestampString, ok := source["@timestamp"].(string) - if !ok { - framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"]) - continue - } - timestamp, err := time.Parse(time.RFC3339, timestampString) - if err != nil { - framework.Logf("Timestamp was not in correct format: %s", timestampString) - continue - } - - entries = append(entries, &logEntry{ - Payload: msg, - Timestamp: timestamp, - }) + entries = append(entries, logEntry{Payload: msg}) } return entries } + +func (logsProvider *esLogsProvider) FluentdApplicationName() string { + return "fluentd-es" +} diff --git a/test/e2e/cluster-logging/sd.go b/test/e2e/cluster-logging/sd.go index 6906258946f..f30f9777739 100644 --- a/test/e2e/cluster-logging/sd.go +++ b/test/e2e/cluster-logging/sd.go @@ -39,8 +39,9 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { gclLogsProvider, err := newGclLogsProvider(f) framework.ExpectNoError(err, "Failed to create GCL logs provider") - err = gclLogsProvider.EnsureWorking() - framework.ExpectNoError(err, "GCL is not working") + err = gclLogsProvider.Init() + defer gclLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init GCL logs provider") err = ensureSingleFluentdOnEachNode(f, gclLogsProvider.FluentdApplicationName()) framework.ExpectNoError(err, "Fluentd deployed incorrectly") diff --git a/test/e2e/cluster-logging/sd_load.go b/test/e2e/cluster-logging/sd_load.go index 7d9ad11a112..c6ff8f4103a 100644 --- a/test/e2e/cluster-logging/sd_load.go +++ b/test/e2e/cluster-logging/sd_load.go @@ -38,13 +38,17 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr gclLogsProvider, err := newGclLogsProvider(f) framework.ExpectNoError(err, "Failed to create GCL logs provider") + err = gclLogsProvider.Init() + defer gclLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init GCL logs provider") + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items nodeCount := len(nodes) podCount := 30 * nodeCount loggingDuration := 10 * time.Minute linesPerSecond := 1000 * nodeCount linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount - ingestionTimeout := 60 * time.Minute + ingestionTimeout := 20 * time.Minute By("Running logs generator pods") pods := []*loggingPod{} @@ -56,9 +60,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) } - By("Waiting for pods to succeed") - time.Sleep(loggingDuration) - By("Waiting for all log lines to be ingested") config := &loggingTestConfig{ LogsProvider: gclLogsProvider, @@ -79,12 +80,16 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr gclLogsProvider, err := newGclLogsProvider(f) framework.ExpectNoError(err, "Failed to create GCL logs provider") + err = gclLogsProvider.Init() + defer gclLogsProvider.Cleanup() + framework.ExpectNoError(err, "Failed to init GCL logs provider") + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items maxPodCount := 10 jobDuration := 1 * time.Minute linesPerPodPerSecond := 100 testDuration := 10 * time.Minute - ingestionTimeout := 60 * time.Minute + ingestionTimeout := 20 * time.Minute podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1 @@ -102,9 +107,6 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Feature:Stackdr time.Sleep(podRunDelay) } - By("Waiting for the last pods to finish") - time.Sleep(jobDuration) - By("Waiting for all log lines to be ingested") config := &loggingTestConfig{ LogsProvider: gclLogsProvider, diff --git a/test/e2e/cluster-logging/sd_utils.go b/test/e2e/cluster-logging/sd_utils.go index 30705f36c54..e3c50ba8fbe 100644 --- a/test/e2e/cluster-logging/sd_utils.go +++ b/test/e2e/cluster-logging/sd_utils.go @@ -17,36 +17,46 @@ limitations under the License. package e2e import ( + "encoding/base64" + "encoding/json" "fmt" + "sync" "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" "golang.org/x/net/context" "golang.org/x/oauth2/google" gcl "google.golang.org/api/logging/v2beta1" + pubsub "google.golang.org/api/pubsub/v1" ) const ( - // GCL doesn't support page size more than 1000 - gclPageSize = 1000 + // The amount of time to wait before considering + // Stackdriver Logging sink operational + sinkInitialDelay = 1 * time.Minute - // If we failed to get response from GCL, it can be a random 500 or - // quota limit exceeded. So we retry for some time in case the problem will go away. - // Quota is enforced every 100 seconds, so we have to wait for more than - // that to reliably get the next portion. - queryGclRetryDelay = 100 * time.Second - queryGclRetryTimeout = 250 * time.Second + // The limit on the number of messages to pull from PubSub + maxPullLogMessages = 100 * 1000 + + // The limit on the number of messages in the cache for a pod + maxCachedMessagesPerPod = 10 * 1000 + + // PubSub topic with log entries polling interval + gclLoggingPollInterval = 100 * time.Millisecond ) type gclLogsProvider struct { - GclService *gcl.Service - Framework *framework.Framework -} - -func (gclLogsProvider *gclLogsProvider) EnsureWorking() error { - // We assume that GCL is always working - return nil + GclService *gcl.Service + PubsubService *pubsub.Service + Framework *framework.Framework + Topic *pubsub.Topic + Subscription *pubsub.Subscription + LogSink *gcl.LogSink + LogEntryCache map[string]chan logEntry + CacheMutex *sync.Mutex + PollingStopChannel chan struct{} } func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { @@ -57,75 +67,206 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { return nil, err } + pubsubService, err := pubsub.New(hc) + if err != nil { + return nil, err + } + provider := &gclLogsProvider{ - GclService: gclService, - Framework: f, + GclService: gclService, + PubsubService: pubsubService, + Framework: f, + LogEntryCache: map[string]chan logEntry{}, + CacheMutex: &sync.Mutex{}, + PollingStopChannel: make(chan struct{}, 1), } return provider, nil } +func (gclLogsProvider *gclLogsProvider) Init() error { + projectId := framework.TestContext.CloudConfig.ProjectID + nsName := gclLogsProvider.Framework.Namespace.Name + + topic, err := gclLogsProvider.createPubSubTopic(projectId, nsName) + if err != nil { + return fmt.Errorf("failed to create PubSub topic: %v", err) + } + gclLogsProvider.Topic = topic + + subs, err := gclLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name) + if err != nil { + return fmt.Errorf("failed to create PubSub subscription: %v", err) + } + gclLogsProvider.Subscription = subs + + logSink, err := gclLogsProvider.createGclLogSink(projectId, nsName, nsName, topic.Name) + if err != nil { + return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err) + } + gclLogsProvider.LogSink = logSink + + if err = gclLogsProvider.authorizeGclLogSink(); err != nil { + return fmt.Errorf("failed to authorize log sink: %v", err) + } + + framework.Logf("Waiting for log sink to become operational") + // TODO: Replace with something more intelligent + time.Sleep(sinkInitialDelay) + + go gclLogsProvider.pollLogs() + + return nil +} + +func (gclLogsProvider *gclLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) { + topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName) + topic := &pubsub.Topic{ + Name: topicFullName, + } + return gclLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do() +} + +func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) { + subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName) + subs := &pubsub.Subscription{ + Name: subsFullName, + Topic: topicName, + } + return gclLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() +} + +func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) { + projectDst := fmt.Sprintf("projects/%s", projectId) + filter := fmt.Sprintf("resource.labels.namespace_id=%s AND resource.labels.container_name=%s", nsName, loggingContainerName) + sink := &gcl.LogSink{ + Name: sinkName, + Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), + Filter: filter, + } + return gclLogsProvider.GclService.Projects.Sinks.Create(projectDst, sink).Do() +} + +func (gclLogsProvider *gclLogsProvider) authorizeGclLogSink() error { + topicsService := gclLogsProvider.PubsubService.Projects.Topics + policy, err := topicsService.GetIamPolicy(gclLogsProvider.Topic.Name).Do() + if err != nil { + return err + } + + binding := &pubsub.Binding{ + Role: "roles/pubsub.publisher", + Members: []string{gclLogsProvider.LogSink.WriterIdentity}, + } + policy.Bindings = append(policy.Bindings, binding) + req := &pubsub.SetIamPolicyRequest{Policy: policy} + if _, err = topicsService.SetIamPolicy(gclLogsProvider.Topic.Name, req).Do(); err != nil { + return err + } + + return nil +} + +func (gclLogsProvider *gclLogsProvider) pollLogs() { + wait.PollUntil(gclLoggingPollInterval, func() (bool, error) { + subsName := gclLogsProvider.Subscription.Name + subsService := gclLogsProvider.PubsubService.Projects.Subscriptions + req := &pubsub.PullRequest{ + ReturnImmediately: true, + MaxMessages: maxPullLogMessages, + } + resp, err := subsService.Pull(subsName, req).Do() + if err != nil { + framework.Logf("Failed to pull messaged from PubSub due to %v", err) + return false, nil + } + + ids := []string{} + for _, msg := range resp.ReceivedMessages { + ids = append(ids, msg.AckId) + + logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) + if err != nil { + framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) + continue + } + + var gclLogEntry gcl.LogEntry + if err := json.Unmarshal(logEntryEncoded, &gclLogEntry); err != nil { + framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) + continue + } + + podName := gclLogEntry.Resource.Labels["pod_id"] + ch := gclLogsProvider.getCacheChannel(podName) + ch <- logEntry{Payload: gclLogEntry.TextPayload} + } + + if len(ids) > 0 { + ackReq := &pubsub.AcknowledgeRequest{AckIds: ids} + if _, err = subsService.Acknowledge(subsName, ackReq).Do(); err != nil { + framework.Logf("Failed to ack: %v", err) + } + } + + return false, nil + }, gclLogsProvider.PollingStopChannel) +} + +func (gclLogsProvider *gclLogsProvider) Cleanup() { + gclLogsProvider.PollingStopChannel <- struct{}{} + + if gclLogsProvider.LogSink != nil { + projectId := framework.TestContext.CloudConfig.ProjectID + sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, gclLogsProvider.LogSink.Name) + sinksService := gclLogsProvider.GclService.Projects.Sinks + if _, err := sinksService.Delete(sinkNameId).Do(); err != nil { + framework.Logf("Failed to delete LogSink: %v", err) + } + } + + if gclLogsProvider.Subscription != nil { + subsService := gclLogsProvider.PubsubService.Projects.Subscriptions + if _, err := subsService.Delete(gclLogsProvider.Subscription.Name).Do(); err != nil { + framework.Logf("Failed to delete PubSub subscription: %v", err) + } + } + + if gclLogsProvider.Topic != nil { + topicsService := gclLogsProvider.PubsubService.Projects.Topics + if _, err := topicsService.Delete(gclLogsProvider.Topic.Name).Do(); err != nil { + framework.Logf("Failed to delete PubSub topic: %v", err) + } + } +} + +func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []logEntry { + var entries []logEntry + ch := gclLogsProvider.getCacheChannel(pod.Name) +polling_loop: + for { + select { + case entry := <-ch: + entries = append(entries, entry) + default: + break polling_loop + } + } + return entries +} + func (logsProvider *gclLogsProvider) FluentdApplicationName() string { return "fluentd-gcp" } -// Since GCL API is not easily available from the outside of cluster -// we use gcloud command to perform search with filter -func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { - filter := fmt.Sprintf("resource.labels.pod_id=%s AND resource.labels.namespace_id=%s AND timestamp>=\"%v\"", - pod.Name, gclLogsProvider.Framework.Namespace.Name, pod.LastTimestamp.Format(time.RFC3339)) - framework.Logf("Reading entries from GCL with filter '%v'", filter) +func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry { + gclLogsProvider.CacheMutex.Lock() + defer gclLogsProvider.CacheMutex.Unlock() - response := getResponseSafe(gclLogsProvider.GclService, filter, "") - - var entries []*logEntry - for response != nil && len(response.Entries) > 0 { - framework.Logf("Received %d entries from GCL", len(response.Entries)) - - for _, entry := range response.Entries { - if entry.TextPayload == "" { - continue - } - - timestamp, parseErr := time.Parse(time.RFC3339, entry.Timestamp) - if parseErr != nil { - continue - } - - entries = append(entries, &logEntry{ - Timestamp: timestamp, - Payload: entry.TextPayload, - }) - } - - nextToken := response.NextPageToken - if nextToken == "" { - break - } - - response = getResponseSafe(gclLogsProvider.GclService, filter, response.NextPageToken) + if ch, ok := gclLogsProvider.LogEntryCache[podName]; ok { + return ch } - return entries -} - -func getResponseSafe(gclService *gcl.Service, filter string, pageToken string) *gcl.ListLogEntriesResponse { - for start := time.Now(); time.Since(start) < queryGclRetryTimeout; time.Sleep(queryGclRetryDelay) { - response, err := gclService.Entries.List(&gcl.ListLogEntriesRequest{ - ProjectIds: []string{ - framework.TestContext.CloudConfig.ProjectID, - }, - OrderBy: "timestamp desc", - Filter: filter, - PageSize: int64(gclPageSize), - PageToken: pageToken, - }).Do() - - if err == nil { - return response - } - - framework.Logf("Failed to get response from GCL due to %v, retrying", err) - } - - return nil + newCh := make(chan logEntry, maxCachedMessagesPerPod) + gclLogsProvider.LogEntryCache[podName] = newCh + return newCh } diff --git a/test/e2e/cluster-logging/utils.go b/test/e2e/cluster-logging/utils.go index 5149ff02939..703026419a9 100644 --- a/test/e2e/cluster-logging/utils.go +++ b/test/e2e/cluster-logging/utils.go @@ -33,13 +33,16 @@ import ( const ( // Duration of delay between any two attempts to check if all logs are ingested - ingestionRetryDelay = 100 * time.Second + ingestionRetryDelay = 30 * time.Second // Amount of requested cores for logging container in millicores loggingContainerCpuRequest = 10 // Amount of requested memory for logging container in bytes loggingContainerMemoryRequest = 10 * 1024 * 1024 + + // Name of the container used for logging tests + loggingContainerName = "logging-container" ) var ( @@ -51,26 +54,21 @@ var ( type loggingPod struct { // Name of the pod Name string - // If we didn't read some log entries, their - // timestamps should be no less than this timestamp. - // Effectively, timestamp of the last ingested entry - // for which there's no missing entry before it - LastTimestamp time.Time // Cache of ingested and read entries - Occurrences map[int]*logEntry + Occurrences map[int]logEntry // Number of lines expected to be ingested from this pod ExpectedLinesNumber int } type logEntry struct { - Payload string - Timestamp time.Time + Payload string } type logsProvider interface { + Init() error + Cleanup() + ReadEntries(*loggingPod) []logEntry FluentdApplicationName() string - EnsureWorking() error - ReadEntries(*loggingPod) []*logEntry } type loggingTestConfig struct { @@ -81,7 +79,7 @@ type loggingTestConfig struct { MaxAllowedFluentdRestarts int } -func (entry *logEntry) getLogEntryNumber() (int, bool) { +func (entry logEntry) getLogEntryNumber() (int, bool) { submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload) if submatch == nil || len(submatch) < 2 { return 0, false @@ -96,10 +94,8 @@ func createLoggingPod(f *framework.Framework, podName string, nodeName string, t createLogsGeneratorPod(f, podName, nodeName, totalLines, loggingDuration) return &loggingPod{ - Name: podName, - // It's used to avoid querying logs from before the pod was started - LastTimestamp: time.Now(), - Occurrences: make(map[int]*logEntry), + Name: podName, + Occurrences: make(map[int]logEntry), ExpectedLinesNumber: totalLines, } } @@ -113,7 +109,7 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, nodeName str RestartPolicy: api_v1.RestartPolicyNever, Containers: []api_v1.Container{ { - Name: podName, + Name: loggingContainerName, Image: "gcr.io/google_containers/logs-generator:v0.1.0", Env: []api_v1.EnvVar{ { @@ -146,7 +142,7 @@ func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error { podHasIngestedLogs := make([]bool, len(config.Pods)) podWithIngestedLogsCount := 0 - for start := time.Now(); podWithIngestedLogsCount < len(config.Pods) && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { + for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { for podIdx, pod := range config.Pods { if podHasIngestedLogs[podIdx] { continue @@ -167,6 +163,10 @@ func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error { } } } + + if podWithIngestedLogsCount == len(config.Pods) { + break + } } if podWithIngestedLogsCount < len(config.Pods) { @@ -189,7 +189,7 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig) missingByPod[podIdx] = pod.ExpectedLinesNumber } - for start := time.Now(); totalMissing > 0 && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { + for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { missing := 0 for podIdx, pod := range config.Pods { if missingByPod[podIdx] == 0 { @@ -203,6 +203,8 @@ func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig) totalMissing = missing if totalMissing > 0 { framework.Logf("Still missing %d lines in total", totalMissing) + } else { + break } } @@ -245,19 +247,14 @@ func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int { if err != nil { framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err) return pod.ExpectedLinesNumber - } else if missingOnPod > 0 { - framework.Logf("Pod %s is missing %d lines", pod.Name, missingOnPod) - } else { - framework.Logf("All logs from pod %s are ingested", pod.Name) } + return missingOnPod } func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) { entries := logsProvider.ReadEntries(pod) - framework.Logf("Got %d entries from provider", len(entries)) - for _, entry := range entries { lineNumber, ok := entry.getLogEntryNumber() if !ok { @@ -271,17 +268,6 @@ func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, erro } } - for i := 0; i < pod.ExpectedLinesNumber; i++ { - entry, ok := pod.Occurrences[i] - if !ok { - break - } - - if entry.Timestamp.After(pod.LastTimestamp) { - pod.LastTimestamp = entry.Timestamp - } - } - return pod.ExpectedLinesNumber - len(pod.Occurrences), nil }