Write log entries as json

Signed-off-by: David Gageot <david.gageot@docker.com>
This commit is contained in:
David Gageot 2022-07-22 14:16:36 +02:00
parent 3f25e09ab5
commit 0b136bf80d
No known key found for this signature in database
GPG Key ID: 93C3F22BE5D3A40B
3 changed files with 63 additions and 106 deletions

View File

@ -1,10 +1,12 @@
package main package main
import ( import (
"bufio" "encoding/json"
"flag" "flag"
"fmt"
"net" "net"
"os" "strings"
"time"
) )
const ( const (
@ -13,6 +15,16 @@ const (
logDumpFollow logDumpFollow
) )
type LogEntry struct {
Time time.Time `json:"time"`
Source string `json:"source"`
Msg string `json:"msg"`
}
func (msg *LogEntry) String() string {
return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg)
}
func main() { func main() {
var err error var err error
@ -49,7 +61,13 @@ func main() {
panic(err) panic(err)
} }
r := bufio.NewReader(conn) var entry LogEntry
r.WriteTo(os.Stdout) decoder := json.NewDecoder(conn)
for {
if err := decoder.Decode(&entry); err != nil {
panic(err)
}
fmt.Println(entry.String())
}
} }

View File

@ -5,9 +5,9 @@ import (
"bytes" "bytes"
"container/list" "container/list"
"container/ring" "container/ring"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"os" "os"
@ -17,10 +17,14 @@ import (
"time" "time"
) )
type logEntry struct { type LogEntry struct {
time time.Time Time time.Time `json:"time"`
source string Source string `json:"source"`
msg string Msg string `json:"msg"`
}
func (msg *LogEntry) String() string {
return fmt.Sprintf("%s;%s;%s", msg.Time.Format(time.RFC3339Nano), strings.ReplaceAll(msg.Source, `;`, `\;`), msg.Msg)
} }
type fdMessage struct { type fdMessage struct {
@ -43,33 +47,32 @@ type queryMessage struct {
type connListener struct { type connListener struct {
conn net.Conn conn net.Conn
output chan *logEntry output chan *LogEntry
err error err error
exitOnEOF bool // exit instead of blocking if no more data in read buffer exitOnEOF bool // exit instead of blocking if no more data in read buffer
} }
func doLog(logCh chan logEntry, msg string) { func doLog(logCh chan LogEntry, msg string) {
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: msg} logCh <- LogEntry{
return Time: time.Now(),
Source: "memlogd",
Msg: msg,
}
} }
func logQueryHandler(l *connListener) { func logQueryHandler(l *connListener) {
defer l.conn.Close() defer l.conn.Close()
encoder := json.NewEncoder(l.conn)
for msg := range l.output { for msg := range l.output {
_, err := io.Copy(l.conn, strings.NewReader(msg.String()+"\n")) if err := encoder.Encode(msg); err != nil {
if err != nil {
l.err = err l.err = err
return return
} }
} }
} }
func (msg *logEntry) String() string { func ringBufferHandler(ringSize, chanSize int, logCh chan LogEntry, queryMsgChan chan queryMessage) {
return fmt.Sprintf("%s,%s;%s", msg.time.Format(time.RFC3339Nano), msg.source, msg.msg)
}
func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan chan queryMessage) {
// Anything that interacts with the ring buffer goes through this handler // Anything that interacts with the ring buffer goes through this handler
ring := ring.New(ringSize) ring := ring.New(ringSize)
listeners := list.New() listeners := list.New()
@ -77,7 +80,8 @@ func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan
for { for {
select { select {
case msg := <-logCh: case msg := <-logCh:
fmt.Printf("%s\n", msg.String()) fmt.Println(msg.String())
// add log entry // add log entry
ring.Value = msg ring.Value = msg
ring = ring.Next() ring = ring.Next()
@ -108,7 +112,7 @@ func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan
case msg := <-queryMsgChan: case msg := <-queryMsgChan:
l := connListener{ l := connListener{
conn: msg.conn, conn: msg.conn,
output: make(chan *logEntry, chanSize), output: make(chan *LogEntry, chanSize),
err: nil, err: nil,
exitOnEOF: (msg.mode == logDump), exitOnEOF: (msg.mode == logDump),
} }
@ -120,7 +124,7 @@ func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan
if msg.mode == logDumpFollow || msg.mode == logDump { if msg.mode == logDumpFollow || msg.mode == logDump {
// fill with current data in buffer // fill with current data in buffer
ring.Do(func(f interface{}) { ring.Do(func(f interface{}) {
if msg, ok := f.(logEntry); ok { if msg, ok := f.(LogEntry); ok {
select { select {
case l.output <- &msg: case l.output <- &msg:
default: default:
@ -136,7 +140,7 @@ func ringBufferHandler(ringSize, chanSize int, logCh chan logEntry, queryMsgChan
} }
} }
func receiveQueryHandler(l *net.UnixListener, logCh chan logEntry, queryMsgChan chan queryMessage) { func receiveQueryHandler(l *net.UnixListener, logCh chan LogEntry, queryMsgChan chan queryMessage) {
for { for {
var conn *net.UnixConn var conn *net.UnixConn
var err error var err error
@ -153,7 +157,7 @@ func receiveQueryHandler(l *net.UnixListener, logCh chan logEntry, queryMsgChan
} }
} }
func receiveFdHandler(conn *net.UnixConn, logCh chan logEntry, fdMsgChan chan fdMessage) { func receiveFdHandler(conn *net.UnixConn, logCh chan LogEntry, fdMsgChan chan fdMessage) {
oob := make([]byte, 512) oob := make([]byte, 512)
b := make([]byte, 512) b := make([]byte, 512)
@ -191,7 +195,7 @@ func receiveFdHandler(conn *net.UnixConn, logCh chan logEntry, fdMsgChan chan fd
} }
} }
func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) { func readLogFromFd(maxLineLen int, fd int, source string, logCh chan LogEntry) {
f := os.NewFile(uintptr(fd), "") f := os.NewFile(uintptr(fd), "")
defer f.Close() defer f.Close()
@ -213,27 +217,20 @@ func readLogFromFd(maxLineLen int, fd int, source string, logCh chan logEntry) {
if buffer.Len() > maxLineLen { if buffer.Len() > maxLineLen {
buffer.Truncate(maxLineLen) buffer.Truncate(maxLineLen)
} }
logCh <- logEntry{time: time.Now(), source: source, msg: buffer.String()} logCh <- LogEntry{
Time: time.Now(),
Source: source,
Msg: buffer.String(),
}
buffer.Reset() buffer.Reset()
l, isPrefix, err = r.ReadLine() l, isPrefix, err = r.ReadLine()
} }
} }
func loggingRequestHandler(lineMaxLength int, logCh chan logEntry, fdMsgChan chan fdMessage) { func loggingRequestHandler(lineMaxLength int, logCh chan LogEntry, fdMsgChan chan fdMessage) {
for true { for msg := range fdMsgChan {
select { go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
case msg := <-fdMsgChan: // incoming fd
if strings.Contains(msg.name, ";") {
// The log message spec bans ";" in the log names
doLog(logCh, fmt.Sprintf("ERROR: cannot register log with name '%s' as it contains ;", msg.name))
if err := syscall.Close(msg.fd); err != nil {
doLog(logCh, fmt.Sprintf("ERROR: failed to close fd: %s", err))
}
continue
}
go readLogFromFd(lineMaxLength, msg.fd, msg.name, logCh)
}
} }
} }
@ -317,7 +314,7 @@ func main() {
os.Exit(0) os.Exit(0)
} }
logCh := make(chan logEntry) logCh := make(chan LogEntry)
fdMsgChan := make(chan fdMessage) fdMsgChan := make(chan fdMessage)
queryMsgChan := make(chan queryMessage) queryMsgChan := make(chan queryMessage)

View File

@ -6,7 +6,6 @@ import (
"log" "log"
"net" "net"
"os" "os"
"strings"
"syscall" "syscall"
"testing" "testing"
"time" "time"
@ -16,7 +15,7 @@ func TestNonblock(t *testing.T) {
// Test that writes to the logger don't block because it is full // Test that writes to the logger don't block because it is full
linesInBuffer := 10 linesInBuffer := 10
logCh := make(chan logEntry) logCh := make(chan LogEntry)
queryMsgChan := make(chan queryMessage) queryMsgChan := make(chan queryMessage)
go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan) go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
@ -24,7 +23,7 @@ func TestNonblock(t *testing.T) {
// Overflow the log to make sure it doesn't block // Overflow the log to make sure it doesn't block
for i := 0; i < 2*linesInBuffer; i++ { for i := 0; i < 2*linesInBuffer; i++ {
select { select {
case logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestNonblock"}: case logCh <- LogEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestNonblock"}:
continue continue
case <-time.After(time.Second): case <-time.After(time.Second):
t.Errorf("write to the logger blocked for over 1s after %d (size was set to %d)", i, linesInBuffer) t.Errorf("write to the logger blocked for over 1s after %d (size was set to %d)", i, linesInBuffer)
@ -36,14 +35,14 @@ func TestFinite(t *testing.T) {
// Test that the logger doesn't store more than its configured maximum size // Test that the logger doesn't store more than its configured maximum size
linesInBuffer := 10 linesInBuffer := 10
logCh := make(chan logEntry) logCh := make(chan LogEntry)
queryMsgChan := make(chan queryMessage) queryMsgChan := make(chan queryMessage)
go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan) go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
// Overflow the log by 2x // Overflow the log by 2x
for i := 0; i < 2*linesInBuffer; i++ { for i := 0; i < 2*linesInBuffer; i++ {
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite"} logCh <- LogEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestFinite"}
} }
a, b := loopback() a, b := loopback()
defer a.Close() defer a.Close()
@ -76,14 +75,14 @@ func TestFinite2(t *testing.T) {
linesInBuffer := 10 linesInBuffer := 10
// the output buffer size will be 1/2 of the ring // the output buffer size will be 1/2 of the ring
outputBufferSize := linesInBuffer / 2 outputBufferSize := linesInBuffer / 2
logCh := make(chan logEntry) logCh := make(chan LogEntry)
queryMsgChan := make(chan queryMessage) queryMsgChan := make(chan queryMessage)
go ringBufferHandler(linesInBuffer, outputBufferSize, logCh, queryMsgChan) go ringBufferHandler(linesInBuffer, outputBufferSize, logCh, queryMsgChan)
// fill the ring // fill the ring
for i := 0; i < linesInBuffer; i++ { for i := 0; i < linesInBuffer; i++ {
logCh <- logEntry{time: time.Now(), source: "memlogd", msg: "hello TestFinite2"} logCh <- LogEntry{Time: time.Now(), Source: "memlogd", Msg: "hello TestFinite2"}
} }
a, b := loopback() a, b := loopback()
@ -113,63 +112,6 @@ func TestFinite2(t *testing.T) {
} }
} }
func TestGoodName(t *testing.T) {
// Test that the source names can't contain ";"
linesInBuffer := 10
logCh := make(chan logEntry)
fdMsgChan := make(chan fdMessage)
queryMsgChan := make(chan queryMessage)
go ringBufferHandler(linesInBuffer, linesInBuffer, logCh, queryMsgChan)
go loggingRequestHandler(80, logCh, fdMsgChan)
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
if err != nil {
log.Fatal("Unable to create socketpair: ", err)
}
a := fdToConn(fds[0])
b := fdToConn(fds[1])
defer a.Close()
defer b.Close()
// defer close fds
fdMsgChan <- fdMessage{
name: "semi-colons are banned;",
fd: fds[0],
}
// although the fd should be rejected my memlogd the Write should be buffered
// by the kernel and not block.
if _, err := b.Write([]byte("hello\n")); err != nil {
log.Fatalf("Failed to write log message: %s", err)
}
c, d := loopback()
defer c.Close()
defer d.Close()
// this log should not be in the ring because the connection was rejected.
queryM := queryMessage{
conn: c,
mode: logDumpFollow,
}
queryMsgChan <- queryM
// The error log is generated asynchronously. It should be fast. On error time out
// after 5s.
d.SetDeadline(time.Now().Add(5 * time.Second))
r := bufio.NewReader(d)
for {
line, err := r.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Unexpected error reading from socket: %s", err)
}
if strings.Contains(line, "ERROR: cannot register log") {
return
}
}
t.Fatal("Failed to read error message when registering a log with a ;")
}
// caller must close fd themselves: closing the net.Conn will not close fd. // caller must close fd themselves: closing the net.Conn will not close fd.
func fdToConn(fd int) net.Conn { func fdToConn(fd int) net.Conn {
f := os.NewFile(uintptr(fd), "") f := os.NewFile(uintptr(fd), "")