diff --git a/pkg/memlogd/cmd/logread/main.go b/pkg/memlogd/cmd/logread/main.go index 419b25216..ff1d2ee90 100644 --- a/pkg/memlogd/cmd/logread/main.go +++ b/pkg/memlogd/cmd/logread/main.go @@ -1,10 +1,12 @@ package main import ( - "bufio" + "encoding/json" "flag" + "fmt" "net" - "os" + "strings" + "time" ) const ( @@ -13,6 +15,16 @@ const ( logDumpFollow ) +type logEntry struct { + Time time.Time `json:"time"` + Source string `json:"source"` + Msg string `json:"msg"` +} + +func (msg *logEntry) String() string { + return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg) +} + func main() { var err error @@ -49,7 +61,13 @@ func main() { panic(err) } - r := bufio.NewReader(conn) - r.WriteTo(os.Stdout) + var entry logEntry + decoder := json.NewDecoder(conn) + for { + if err := decoder.Decode(&entry); err != nil { + panic(err) + } + fmt.Println(entry.String()) + } } diff --git a/pkg/memlogd/cmd/memlogd/main.go b/pkg/memlogd/cmd/memlogd/main.go index d9140c91e..7f8dbeea7 100644 --- a/pkg/memlogd/cmd/memlogd/main.go +++ b/pkg/memlogd/cmd/memlogd/main.go @@ -5,9 +5,9 @@ import ( "bytes" "container/list" "container/ring" + "encoding/json" "flag" "fmt" - "io" "log" "net" "os" @@ -18,9 +18,13 @@ import ( ) type logEntry struct { - time time.Time - source string - msg string + Time time.Time `json:"time"` + Source string `json:"source"` + Msg string `json:"msg"` +} + +func (msg *logEntry) String() string { + return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg) } type fdMessage struct { @@ -49,26 +53,25 @@ type connListener struct { } func doLog(logCh chan logEntry, msg string) { - logCh <- logEntry{time: time.Now(), source: "memlogd", msg: msg} - return + logCh <- logEntry{ + Time: time.Now(), + Source: "memlogd", + Msg: msg, + } } func logQueryHandler(l *connListener) { defer l.conn.Close() + encoder := json.NewEncoder(l.conn) for msg := range l.output { - _, err := io.Copy(l.conn, strings.NewReader(msg.String()+"\n")) - if err != nil { + if err := encoder.Encode(msg); err != nil { l.err = err return } } } -func (msg *logEntry) String() string { - return fmt.Sprintf("%s,%s;%s", msg.time.Format(time.RFC3339Nano), msg.source, msg.msg) -} - func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) { // Anything that interacts with the ring buffer goes through this handler ring := ring.New(ringSize) @@ -77,7 +80,8 @@ func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan for { select { case msg := <-logCh: - fmt.Printf("%s\n", msg.String()) + fmt.Println(msg.String()) + // add log entry ring.Value = msg ring = ring.Next() @@ -213,7 +217,11 @@ func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) { if buffer.Len() > maxLineLen { buffer.Truncate(maxLineLen) } - logCh <- logEntry{time: time.Now(), source: source, msg: buffer.String()} + logCh <- logEntry{ + Time: time.Now(), + Source: source, + Msg: buffer.String(), + } buffer.Reset() l, isPrefix, err = r.ReadLine() @@ -221,19 +229,8 @@ func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) { } func loggingRequestHandler(lineMaxLength int, logCh chan logEntry, fdMsgChan chan fdMessage) { - for true { - select { - case msg := <-fdMsgChan: // incoming fd - if strings.Contains(msg.name, ";") { - // The log message spec bans ";" in the log names - doLog(logCh, fmt.Sprintf("ERROR: cannot register log with name '%s' as it contains ;", msg.name)) - if err := syscall.Close(msg.fd); err != nil { - doLog(logCh, fmt.Sprintf("ERROR: failed to close fd: %s", err)) - } - continue - } - go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh) - } + for msg := range fdMsgChan { + go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh) } } diff --git a/pkg/memlogd/cmd/memlogd/main_test.go b/pkg/memlogd/cmd/memlogd/main_test.go index b2d6c72dc..4f48554ef 100644 --- a/pkg/memlogd/cmd/memlogd/main_test.go +++ b/pkg/memlogd/cmd/memlogd/main_test.go @@ -6,7 +6,6 @@ import ( "log" "net" "os" - "strings" "syscall" "testing" "time" @@ -24,7 +23,7 @@ func TestNonblock(t *testing.T) { // Overflow the log to make sure it doesn't block for i := 0; i < 2*linesInBuffer; i++ { select { - case logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestNonblock"}: + case logCh <- logEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestNonblock"}: continue case <-time.After(time.Second): t.Errorf("write to the logger blocked for over 1s after %d (size was set to %d)", i, linesInBuffer) @@ -43,7 +42,7 @@ func TestFinite(t *testing.T) { // Overflow the log by 2x for i := 0; i < 2*linesInBuffer; i++ { - logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite"} + logCh <- logEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestFinite"} } a, b := loopback() defer a.Close() @@ -83,7 +82,7 @@ func TestFinite2(t *testing.T) { // fill the ring for i := 0; i < linesInBuffer; i++ { - logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite2"} + logCh <- logEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestFinite2"} } a, b := loopback() @@ -113,63 +112,6 @@ func TestFinite2(t *testing.T) { } } -func TestGoodName(t *testing.T) { - // Test that the source names can't contain ";" - linesInBuffer := 10 - logCh := make(chan logEntry) - fdMsgChan := make(chan fdMessage) - queryMsgChan := make(chan queryMessage) - - go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan) - go loggingRequestHandler(80, logCh, fdMsgChan) - - fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) - if err != nil { - log.Fatal("Unable to create socketpair: ", err) - } - a := fdToConn(fds[0]) - b := fdToConn(fds[1]) - defer a.Close() - defer b.Close() - // defer close fds - - fdMsgChan <- fdMessage{ - name: "semi-colons are banned;", - fd: fds[0], - } - // although the fd should be rejected my memlogd the Write should be buffered - // by the kernel and not block. - if _, err := b.Write([]byte("hello\n")); err != nil { - log.Fatalf("Failed to write log message: %s", err) - } - c, d := loopback() - defer c.Close() - defer d.Close() - // this log should not be in the ring because the connection was rejected. - queryM := queryMessage{ - conn: c, - mode: logDumpFollow, - } - queryMsgChan <- queryM - // The error log is generated asynchronously. It should be fast. On error time out - // after 5s. - d.SetDeadline(time.Now().Add(5 * time.Second)) - r := bufio.NewReader(d) - for { - line, err := r.ReadString('\n') - if err == io.EOF { - break - } - if err != nil { - log.Fatalf("Unexpected error reading from socket: %s", err) - } - if strings.Contains(line, "ERROR: cannot register log") { - return - } - } - t.Fatal("Failed to read error message when registering a log with a ;") -} - // caller must close fd themselves: closing the net.Conn will not close fd. func fdToConn(fd int) net.Conn { f := os.NewFile(uintptr(fd), "")