Implement e2e test for Stackdriver event exporter

This commit is contained in:
Mikhail Vyatskov
2017-06-21 23:24:43 +02:00
committed by Mik Vyatskov
parent 46fe7f062b
commit f11d8b9daa
3 changed files with 140 additions and 7 deletions

View File

@@ -40,8 +40,8 @@ const (
// The limit on the number of messages to pull from PubSub
maxPullLogMessages = 100 * 1000
// The limit on the number of messages in the cache for a pod
maxCachedMessagesPerPod = 10 * 1000
// The limit on the number of messages in the single cache
maxCacheSize = 10 * 1000
// PubSub topic with log entries polling interval
gclLoggingPollInterval = 100 * time.Millisecond
@@ -55,6 +55,7 @@ type gclLogsProvider struct {
Subscription *pubsub.Subscription
LogSink *gcl.LogSink
LogEntryCache map[string]chan logEntry
EventCache chan map[string]interface{}
CacheMutex *sync.Mutex
PollingStopChannel chan struct{}
}
@@ -77,6 +78,7 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
PubsubService: pubsubService,
Framework: f,
LogEntryCache: map[string]chan logEntry{},
EventCache: make(chan map[string]interface{}, maxCacheSize),
CacheMutex: &sync.Mutex{},
PollingStopChannel: make(chan struct{}, 1),
}
@@ -137,7 +139,9 @@ func (gclLogsProvider *gclLogsProvider) createPubSubSubscription(projectId, subs
func (gclLogsProvider *gclLogsProvider) createGclLogSink(projectId, nsName, sinkName, topicName string) (*gcl.LogSink, error) {
projectDst := fmt.Sprintf("projects/%s", projectId)
filter := fmt.Sprintf("resource.labels.namespace_id=%s AND resource.labels.container_name=%s", nsName, loggingContainerName)
filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+
"(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName)
framework.Logf("Using the following filter for entries: %s", filter)
sink := &gcl.LogSink{
Name: sinkName,
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
@@ -196,9 +200,30 @@ func (gclLogsProvider *gclLogsProvider) pollLogs() {
continue
}
podName := gclLogEntry.Resource.Labels["pod_id"]
ch := gclLogsProvider.getCacheChannel(podName)
ch <- logEntry{Payload: gclLogEntry.TextPayload}
switch gclLogEntry.Resource.Type {
case "container":
podName := gclLogEntry.Resource.Labels["pod_id"]
ch := gclLogsProvider.getCacheChannel(podName)
ch <- logEntry{Payload: gclLogEntry.TextPayload}
break
case "gke_cluster":
jsonPayloadRaw, err := gclLogEntry.JsonPayload.MarshalJSON()
if err != nil {
framework.Logf("Failed to get jsonPayload from LogEntry %v", gclLogEntry)
break
}
var eventObject map[string]interface{}
err = json.Unmarshal(jsonPayloadRaw, &eventObject)
if err != nil {
framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:]))
break
}
gclLogsProvider.EventCache <- eventObject
break
default:
framework.Logf("Received LogEntry with unexpected resource type: %s", gclLogEntry.Resource.Type)
break
}
}
if len(ids) > 0 {
@@ -258,6 +283,20 @@ func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
return "fluentd-gcp"
}
func (gclLogsProvider *gclLogsProvider) ReadEvents() []map[string]interface{} {
var events []map[string]interface{}
polling_loop:
for {
select {
case event := <-gclLogsProvider.EventCache:
events = append(events, event)
default:
break polling_loop
}
}
return events
}
func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan logEntry {
gclLogsProvider.CacheMutex.Lock()
defer gclLogsProvider.CacheMutex.Unlock()
@@ -266,7 +305,7 @@ func (gclLogsProvider *gclLogsProvider) getCacheChannel(podName string) chan log
return ch
}
newCh := make(chan logEntry, maxCachedMessagesPerPod)
newCh := make(chan logEntry, maxCacheSize)
gclLogsProvider.LogEntryCache[podName] = newCh
return newCh
}